#### **Sequence to Sequence Learning with Neural Networks (NIPS 2014)** 논문 구현

### **1. 데이터 불러오기**

* **spaCy 라이브러리**: 문장의 토큰화(tokenization), 태깅(tagging) 등의 전처리 기능을 위한 라이브러리

In [None]:
%%capture
!python -m spacy download en
!python -m spacy download de

In [None]:
import spacy
import de_core_news_sm
import en_core_web_sm

spacy_en = en_core_web_sm.load() ## 영어 토큰화(tokenization)
spacy_de = de_core_news_sm.load() ## 독일어 토큰화(tokenization)

### **2. 데이터 전처리**


In [None]:
## 간단히 토큰화(tokenization) 기능 써보기
tokenized = spacy_en.tokenizer("I am a graduate student.")

for i, token in enumerate(tokenized):
    print(f"인덱스 {i}: {token.text}")

인덱스 0: I
인덱스 1: am
인덱스 2: a
인덱스 3: graduate
인덱스 4: student
인덱스 5: .


* 영어(English) 및 독일어(Deutsch) **토큰화 함수** 정의
  - 독일어가 source , 영어가 target

In [None]:
## 독일어(Deutsch) 문장을 토큰화한 뒤에 순서를 뒤집는 함수
def tokenize_de(text):
    return [token.text for token in spacy_de.tokenizer(text)][::-1]

## 영어(English) 문장을 토큰화 하는 함수
def tokenize_en(text):
    return [token.text for token in spacy_en.tokenizer(text)]

///////////////////////////////////

In [None]:
## torchtext 의 버전을 맞춰주기 위해서 torch를 제거하고 다시 재설치!
## torch==1.10.0 , torchtext==0.11.0

import torchtext
torchtext.__version__

'0.11.0'

In [None]:
# import locale
# def getpreferredencoding(do_setlocale = True):
#     return "UTF-8"
# locale.getpreferredencoding = getpreferredencoding
# !pip uninstall torch

In [None]:
# !pip install torch==1.10.0

In [None]:
# !pip install torchtext==0.11.0

///////////////////////////////////

* **필드(field)** 라이브러리를 이용해 데이터셋에 대한 구체적인 전처리 내용을 명시합니다.
* 번역 목표
    * 소스(SRC): 독일어
    * 목표(TRG): 영어

In [None]:
from torchtext.legacy.data import Field , BucketIterator


SRC = Field(tokenize=tokenize_de, init_token="<sos>", eos_token="<eos>", lower=True)
TRG = Field(tokenize=tokenize_en, init_token="<sos>", eos_token="<eos>", lower=True)

### **2. 데이터 불러오기**
- 대표적인 영어-독어 번역 데이터 셋인 Multi30k 를 불러오기
  - Multi30k는 약 30,000개 데이터로 구성된 번역 데이터셋
  - 간단한 실습을 위해서 간단한 데이터셋을 불러옴!

In [None]:
from torchtext.legacy.datasets import Multi30k

train_dataset, valid_dataset, test_dataset = Multi30k.splits(exts=(".de", ".en"), fields=(SRC, TRG))

downloading training.tar.gz


100%|██████████| 1.21M/1.21M [00:01<00:00, 1.03MB/s]


downloading validation.tar.gz


100%|██████████| 46.3k/46.3k [00:00<00:00, 277kB/s]


downloading mmt_task1_test2016.tar.gz


100%|██████████| 66.2k/66.2k [00:00<00:00, 267kB/s]


In [None]:
print(f"학습 데이터셋(training dataset) 크기: {len(train_dataset.examples)}개")
print(f"평가 데이터셋(validation dataset) 크기: {len(valid_dataset.examples)}개")
print(f"테스트 데이터셋(testing dataset) 크기: {len(test_dataset.examples)}개")

학습 데이터셋(training dataset) 크기: 29000개
평가 데이터셋(validation dataset) 크기: 1014개
테스트 데이터셋(testing dataset) 크기: 1000개


In [None]:
## 학습 데이터를 임의로 출력
print(vars(train_dataset.examples[30])['src'])
print(vars(train_dataset.examples[30])['trg'])

['.', 'steht', 'urinal', 'einem', 'an', 'kaffee', 'tasse', 'einer', 'mit', 'der', ',', 'mann', 'ein']
['a', 'man', 'standing', 'at', 'a', 'urinal', 'with', 'a', 'coffee', 'cup', '.']


* **필드(field)** 객체의 **build_vocab** 메서드를 이용해 영어와 독어의 단어 사전을 생성
  * 입력 차원을 명시하기 위해서!
  * **최소 2번 이상** 등장한 단어만을 선택

In [None]:
SRC.build_vocab(train_dataset, min_freq=2)
TRG.build_vocab(train_dataset, min_freq=2)

print(f"len(SRC): {len(SRC.vocab)}")
print(f"len(TRG): {len(TRG.vocab)}")

len(SRC): 7853
len(TRG): 5893


In [None]:
print(TRG.vocab.stoi["abcabc"]) ## 없는 단어: 0
print(TRG.vocab.stoi[TRG.pad_token]) ## 패딩(padding): 1
print(TRG.vocab.stoi["<sos>"]) ## <sos>: 2
print(TRG.vocab.stoi["<eos>"]) ## <eos>: 3
print(TRG.vocab.stoi["hello"])
print(TRG.vocab.stoi["world"])

0
1
2
3
4112
1752


* 학습을 잘 진행하기 위해서 배치 단위로 각각의 문장을 묶어서 학습을 진행
  * 이를 위해서 BucketIterator를 사용

* **배치 크기(batch size)**: 128

In [None]:
import torch

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

BATCH_SIZE = 128

train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
    (train_dataset, valid_dataset, test_dataset),
    batch_size=BATCH_SIZE,
    device=device)

In [None]:
## train_iterator에서 첫 번째 배치에 대한 정보만 확인해보기

for i, batch in enumerate(train_iterator):
    src = batch.src
    trg = batch.trg

    print(f"첫 번째 배치 크기: {src.shape}")

    ## 현재 배치에 있는 하나의 문장에 포함된 정보 출력
    for i in range(src.shape[0]):
        print(f"인덱스 {i}: {src[i][0].item()}")

    ## 첫 번째 배치만 확인
    break

첫 번째 배치 크기: torch.Size([30, 128])
인덱스 0: 2
인덱스 1: 4
인덱스 2: 53
인덱스 3: 353
인덱스 4: 6
인덱스 5: 7
인덱스 6: 17
인덱스 7: 9
인덱스 8: 16
인덱스 9: 8
인덱스 10: 10
인덱스 11: 13
인덱스 12: 5
인덱스 13: 9
인덱스 14: 41
인덱스 15: 18
인덱스 16: 3
인덱스 17: 1
인덱스 18: 1
인덱스 19: 1
인덱스 20: 1
인덱스 21: 1
인덱스 22: 1
인덱스 23: 1
인덱스 24: 1
인덱스 25: 1
인덱스 26: 1
인덱스 27: 1
인덱스 28: 1
인덱스 29: 1


### **3. 모델링**

* 인코더 디코더 구조는 거의 유사
  * 디코더에 FC layer 가 추가된다는게 차이점!

#### **3-1. 인코더(Encoder) 아키텍처**

* 주어진 소스 문장을 문맥 벡터(context vector)로 인코딩
* 하이퍼 파라미터(hyperparameter)
    * **input_dim**: 하나의 단어에 대한 원핫 인코딩 차원
    * **embed_dim**: 임베딩(embedding) 차원
    * **hidden_dim**: 히든 상태(hidden state) 차원
    * **n_layers**: RNN 레이어의 개수
    * **dropout_ratio**: 드롭아웃(dropout) 비율

In [None]:
import torch.nn as nn

## 인코더는 임베딩 레이어, LSTM 레이어 2가지로 구성됨
class Encoder(nn.Module):
    def __init__(self, input_dim, embed_dim, hidden_dim, n_layers, dropout_ratio):
        super().__init__()

        ## 임베딩(embedding) 레이어
        self.embedding = nn.Embedding(input_dim, embed_dim)

        ## LSTM 레이어
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.rnn = nn.LSTM(embed_dim, hidden_dim, n_layers, dropout=dropout_ratio)
        
        ## dropout
        self.dropout = nn.Dropout(dropout_ratio)

    ## 인코더는 소스 문장을 입력으로 받아 문맥 벡터(context vector)를 반환        
    def forward(self, src):
        embedded = self.dropout(self.embedding(src))

        outputs, (hidden, cell) = self.rnn(embedded)

        return hidden, cell

#### **3-2. 디코더(Decoder) 아키텍처**

* 주어진 문맥 벡터(context vector)를 타겟 문장으로 디코딩
* 하이퍼 파라미터(hyperparameter)
    * **input_dim**: 하나의 단어에 대한 원핫 인코딩 차원
    * **embed_dim**: 임베딩(embedding) 차원
    * **hidden_dim**: 히든 상태(hidden state) 차원
    * **n_layers**: RNN 레이어의 개수
    * **dropout_ratio**: 드롭아웃(dropout) 비율

In [None]:
## 디코더는 임베딩 레이어, LSTM 레이어, FC레이어 3가지로 구성

class Decoder(nn.Module):
    def __init__(self, output_dim, embed_dim, hidden_dim, n_layers, dropout_ratio):
        super().__init__()

        ## 임베딩(embedding) 레이어
        self.embedding = nn.Embedding(output_dim, embed_dim)

        ## LSTM 레이어
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.rnn = nn.LSTM(embed_dim, hidden_dim, n_layers, dropout=dropout_ratio)
        
        ## FC 레이어 (인코더와 다른 부분)
        self.output_dim = output_dim
        self.fc_out = nn.Linear(hidden_dim, output_dim)
        
        ## dropout
        self.dropout = nn.Dropout(dropout_ratio)

    ## 디코더는 현재까지 출력된 문장에 대한 정보를 입력으로 받아 타겟 문장을 반환     
    def forward(self, input, hidden, cell):
        input = input.unsqueeze(0)
        
        embedded = self.dropout(self.embedding(input))
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        prediction = self.fc_out(output.squeeze(0))
        
        ## (현재 출력 단어, 현재까지의 모든 단어의 정보, 현재까지의 모든 단어의 정보)
        return prediction, hidden, cell

#### **3-3. Seq2Seq 아키텍처**

* 앞서 정의한 인코더(encoder)와 디코더(decoder)를 가지고 있는 하나의 아키텍처
    * **인코더(encoder)**: 주어진 소스 문장을 문맥 벡터(context vector)로 인코딩
    * **디코더(decoder)**: 주어진 문맥 벡터(context vector)를 타겟 문장으로 디코딩
    * 디코더는 한 단어씩 넣어서 한 번씩 결과를 구함
* **Teacher forcing**: 디코더의 예측(prediction)을 다음 입력으로 사용하지 않고, 실제 목표 출력(ground-truth)을 다음 입력으로 사용하는 기법

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()

        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    ## 학습할 때는 완전한 형태의 소스 문장, 타겟 문장, teacher_forcing_ratio를 넣기
    def forward(self, src, trg, teacher_forcing_ratio=0.5):
      

        ## 먼저 인코더를 거쳐 문맥 벡터(context vector)를 추출
        hidden, cell = self.encoder(src)

        ## 디코더(decoder)의 최종 결과를 담을 텐서 객체 만들기
        trg_len = trg.shape[0] 
        batch_size = trg.shape[1]
        trg_vocab_size = self.decoder.output_dim 
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)

        ## 첫 번째 입력은 항상 <sos> 토큰
        input = trg[0, :]

        ## 타겟 단어의 개수만큼 반복하여 디코더에 포워딩(forwarding)
        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)

            outputs[t] = output
            top1 = output.argmax(1)

            ## teacher_forcing_ratio
            teacher_force = random.random() < teacher_forcing_ratio
            ## 현재의 출력 결과를 다음 입력에서 넣기
            input = trg[t] if teacher_force else top1 
        
        return outputs

#### **4. 학습(Training)**

* 하이퍼 파라미터 설정 및 모델 초기화

In [None]:
INPUT_DIM = len(SRC.vocab)
OUTPUT_DIM = len(TRG.vocab)

ENCODER_EMBED_DIM = 256
DECODER_EMBED_DIM = 256
HIDDEN_DIM = 512

## 논문에서는 4로 진행
N_LAYERS = 2

ENC_DROPOUT_RATIO = 0.5
DEC_DROPOUT_RATIO = 0.5

In [None]:
## 객체 초기화
enc = Encoder(INPUT_DIM, ENCODER_EMBED_DIM, HIDDEN_DIM, N_LAYERS, ENC_DROPOUT_RATIO)
dec = Decoder(OUTPUT_DIM, DECODER_EMBED_DIM, HIDDEN_DIM, N_LAYERS, DEC_DROPOUT_RATIO)

model = Seq2Seq(enc, dec, device).to(device)

In [None]:
def init_weights(m):
    for name, param in m.named_parameters():
        nn.init.uniform_(param.data, -0.08, 0.08)
        ## 모델 가중치 파라미터 초기화
model.apply(init_weights)

Seq2Seq(
  (encoder): Encoder(
    (embedding): Embedding(7853, 256)
    (rnn): LSTM(256, 512, num_layers=2, dropout=0.5)
    (dropout): Dropout(p=0.5, inplace=False)
  )
  (decoder): Decoder(
    (embedding): Embedding(5893, 256)
    (rnn): LSTM(256, 512, num_layers=2, dropout=0.5)
    (fc_out): Linear(in_features=512, out_features=5893, bias=True)
    (dropout): Dropout(p=0.5, inplace=False)
  )
)

* 학습 및 평가 함수 정의

In [None]:
import torch.optim as optim

## Adam optimizer로 학습 최적화
optimizer = optim.Adam(model.parameters())

TRG_PAD_IDX = TRG.vocab.stoi[TRG.pad_token]
criterion = nn.CrossEntropyLoss(ignore_index=TRG_PAD_IDX)

In [None]:
## 모델 학습(train) 함수

def train(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0
    
   
    for i, batch in enumerate(iterator):
        src = batch.src
        trg = batch.trg
        
        optimizer.zero_grad()

        output = model(src, trg)
        output_dim = output.shape[-1]
        
        output = output[1:].view(-1, output_dim)
        trg = trg[1:].view(-1)
      
        
        ## 모델의 출력 결과와 타겟 문장을 비교하여 손실 계산
        loss = criterion(output, trg)
        loss.backward()
        
        ## 기울기(gradient) clipping 진행
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        
        ## 파라미터 업데이트
        optimizer.step()
        
        ## 전체 손실 값 계산
        epoch_loss += loss.item()
        
    return epoch_loss / len(iterator)

In [None]:
## 모델 평가(evaluate) 함수

def evaluate(model, iterator, criterion):
    model.eval() 
    epoch_loss = 0
    
    with torch.no_grad():
        for i, batch in enumerate(iterator):
            src = batch.src
            trg = batch.trg

            ## 평가할 때 teacher forcing는 사용하지 않음
            output = model(src, trg, 0)
            output_dim = output.shape[-1]
            
      
            output = output[1:].view(-1, output_dim)
            trg = trg[1:].view(-1)

            ## 모델의 출력 결과와 타겟 문장을 비교하여 손실 계산
            loss = criterion(output, trg)

            ## 전체 손실 값 계산
            epoch_loss += loss.item()
        
    return epoch_loss / len(iterator)

* 학습 및 검증 진행
    * epoch : 20

In [None]:
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [None]:
import time
import math
import random

N_EPOCHS = 20
CLIP = 1
best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):
    start_time = time.time() ## 시작 시간 기록

    train_loss = train(model, train_iterator, optimizer, criterion, CLIP)
    valid_loss = evaluate(model, valid_iterator, criterion)

    end_time = time.time() ## 종료 시간 기록
    epoch_mins, epoch_secs = epoch_time(start_time, end_time)

    ## best 일 때 모델 저장
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'seq2seq.pt')

    print(f'Epoch: {epoch + 1:02} | Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):.3f}')
    print(f'\tValidation Loss: {valid_loss:.3f} | Validation PPL: {math.exp(valid_loss):.3f}')

Epoch: 01 | Time: 0m 43s
	Train Loss: 5.050 | Train PPL: 155.999
	Validation Loss: 4.971 | Validation PPL: 144.231
Epoch: 02 | Time: 0m 43s
	Train Loss: 4.481 | Train PPL: 88.357
	Validation Loss: 4.710 | Validation PPL: 111.075
Epoch: 03 | Time: 0m 42s
	Train Loss: 4.186 | Train PPL: 65.732
	Validation Loss: 4.557 | Validation PPL: 95.276
Epoch: 04 | Time: 0m 42s
	Train Loss: 3.983 | Train PPL: 53.676
	Validation Loss: 4.498 | Validation PPL: 89.873
Epoch: 05 | Time: 0m 42s
	Train Loss: 3.819 | Train PPL: 45.540
	Validation Loss: 4.351 | Validation PPL: 77.562
Epoch: 06 | Time: 0m 42s
	Train Loss: 3.699 | Train PPL: 40.413
	Validation Loss: 4.286 | Validation PPL: 72.648
Epoch: 07 | Time: 0m 42s
	Train Loss: 3.593 | Train PPL: 36.337
	Validation Loss: 4.217 | Validation PPL: 67.862
Epoch: 08 | Time: 0m 42s
	Train Loss: 3.464 | Train PPL: 31.939
	Validation Loss: 4.212 | Validation PPL: 67.507
Epoch: 09 | Time: 0m 42s
	Train Loss: 3.347 | Train PPL: 28.405
	Validation Loss: 4.089 | Val

In [None]:
## 학습된 모델 저장
from google.colab import files

files.download('seq2seq.pt')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [None]:
## evaluate 진행

model.load_state_dict(torch.load('seq2seq.pt'))

test_loss = evaluate(model, test_iterator, criterion)

print(f'Test Loss: {test_loss:.3f} | Test PPL: {math.exp(test_loss):.3f}')

Test Loss: 3.692 | Test PPL: 40.138


In [None]:
## 번역(translation) 함수
def translate_sentence(sentence, src_field, trg_field, model, device, max_len=50):
    model.eval()

    if isinstance(sentence, str):
        nlp = spacy.load('de')
        tokens = [token.text.lower() for token in nlp(sentence)]
    else:
        tokens = [token.lower() for token in sentence]

    ## 처음에 <sos> 토큰, 마지막에 <eos> 토큰 붙이기
    tokens = [src_field.init_token] + tokens + [src_field.eos_token]
    print(f"전체 소스 토큰: {tokens}")

    src_indexes = [src_field.vocab.stoi[token] for token in tokens]
    print(f"소스 문장 인덱스: {src_indexes}")

    src_tensor = torch.LongTensor(src_indexes).unsqueeze(1).to(device)

    ## 인코더(endocer)에 소스 문장을 넣어 문맥 벡터(context vector) 계산
    with torch.no_grad():
        hidden, cell = model.encoder(src_tensor)

    trg_indexes = [trg_field.vocab.stoi[trg_field.init_token]]

    for i in range(max_len):
        ## 이전에 출력한 단어가 현재 단어로 입력
        trg_tensor = torch.LongTensor([trg_indexes[-1]]).to(device)

        with torch.no_grad():
            output, hidden, cell = model.decoder(trg_tensor, hidden, cell)

        pred_token = output.argmax(1).item()
        trg_indexes.append(pred_token)

        ## <eos>를 만나는 순간 break
        if pred_token == trg_field.vocab.stoi[trg_field.eos_token]:
            break

    ## 각 출력 단어 인덱스를 실제 단어로 변환 (itos -> index to string)
    trg_tokens = [trg_field.vocab.itos[i] for i in trg_indexes]

    ## 첫 번째 <sos>는 제외하고 출력 문장 반환
    return trg_tokens[1:]

In [None]:
example_idx = 10

src = vars(test_dataset.examples[example_idx])['src']
trg = vars(test_dataset.examples[example_idx])['trg']

print(f'소스 문장: {src}')
print(f'타겟 문장: {trg}')
print("모델 출력 결과:", " ".join(translate_sentence(src, SRC, TRG, model, device)))

소스 문장: ['.', 'freien', 'im', 'tag', 'schönen', 'einen', 'genießen', 'sohn', 'kleiner', 'ihr', 'und', 'mutter', 'eine']
타겟 문장: ['a', 'mother', 'and', 'her', 'young', 'song', 'enjoying', 'a', 'beautiful', 'day', 'outside', '.']
전체 소스 토큰: ['<sos>', '.', 'freien', 'im', 'tag', 'schönen', 'einen', 'genießen', 'sohn', 'kleiner', 'ihr', 'und', 'mutter', 'eine', '<eos>']
소스 문장 인덱스: [2, 4, 88, 20, 200, 780, 19, 565, 624, 70, 134, 10, 364, 8, 3]
모델 출력 결과: a mother and a boy are enjoying their day on a sunny day . <eos>


In [None]:
src = tokenize_de("Guten Abend.")

print(f'소스 문장: {src}')
print("모델 출력 결과:", " ".join(translate_sentence(src, SRC, TRG, model, device)))

소스 문장: ['.', 'Abend', 'Guten']
전체 소스 토큰: ['<sos>', '.', 'abend', 'guten', '<eos>']
소스 문장 인덱스: [2, 4, 1163, 3798, 3]
모델 출력 결과: a . <eos>
