## Attention

- 어텐션(attention)은 입력에 대한 벡터 변환을 인코더(encoder)에서 처리하고 모든 벡터를 디코더(decoder)로 보냄.
이렇게 모든 벡터를 전달하는 것은 시간이 흐를수록 초기 정보를 잃어버리는 `기울기 소실`문제를 해결하기 위함임. 
그러나 모든 벡터가 전달됨으로써 행렬 크기가 매우 커지는 단점이 있는데, 이를 해결하기 위해 `소프트맥스 함수`를 사용하여
가중합을 구하고, 그 값을 디코더에 전달함.

- 가중합이 전달되면서, 정보를 많이 전달받은 디코더에게 부담이 가기 때문에, 디코더는 은닉 상태에 대하여
중점적으로 `집중(attention)`해 보아야 할 벡터를 소프트맥스 함수로 점수매긴 후 각각을 은닉 상태 벡터들과 곱함.
그리고 이 은닉 상태를 모두 더하여 하나의 값으로 만듦. 즉, 어텐션은 모든 벡터 중 꼭 살펴봐야 할 벡터에 집중하겠다는 의미임.

## Transformer
- 트랜스포머는 어텐션을 극대화하는 방법으로, 인코더와 디코더를 여러개 중첩시킨 구조임.
이 때 각각의 인코더와 디코더를 `block`이라고 함(논문에서는 인코더와 디코더 블록을 6개씩 중첩한 구조 사용)

- 하나의 인코더는 `self-attention`과 `전방향 신경망(feed-forward neural network)`으로 구성되어 있음.
인코더에서는 단어를 벡터로 임베딩하며, 이를 셀프 어텐션과 전방향 신경망으로 전달함.
이 때 셀프 어텐션은 문장에서 각 단어끼리 얼마나 관계하는지를 계산해서 반영함. 즉, 셀프 어텐션으로
문장 안에서 단어 간 관계를 파악할 수 있음. 셀프 어텐션에서 파악된 단어간 관계는 전방향 신경망으로 전달됨

- 디코더는 층을 총 3개 가지는데, 인코더에서 넘어온 벡터가 처음 만나는 것이 self-attention 층임.(인코더와 동일)
셀프 어텐션 층을 지나면 인코더-디코더 어텐션 층이 있음. 이 층에서는 인코더가 처리한 정보를 받아 어텐션 메커니즘을 수행하고,
마지막으로 전방향 신경망으로 데이터가 전달됨.

### 어텐션 메커니즘
- 어텐션 메커니즘을 이용하기 위해서는 가장 먼저 `어텐션 스코어`를 구해야 함
- 어텐션 스코어란, 현 디코더의 시점 i에서 단어를 예측하기 위해, 인코더의 모든 은닉상태 값($h_j$)이
디코더의 현 시점 은닉 상태($s_i$)와 얼마나 유사한지(관련이 있는지)를 판단하는 값임. 따라서,
어텐션 스코어는 앞 수식처럼 인코더의 모든 은닉 상태 값($h_j$)과 디코더에서 이전 시점 은닉상태($s_{i-1}$)
값을 이용하여 구할 수 있음
- 어텐션 스코어가 계산되면, 이 값을 소프트맥스 함수에 적용하여 확률로 변환하고,
이렇게 계산된 0 ~ 1 사이의 값들이 특정 시점(timestep)에 대한 가중치, 즉 시간의 가중치가 됨.
- 시간의 가중치($a_{ij}$)와 은닉 상태($h_j$)의 가중합을 계산하면 하나의 벡터가 계산되는데, 이것이 컨텍스트 벡터(context vector)임.
- 마지막으로 디코더의 은닉 상태를 구하는데, 이를 위해 컨텍스트 벡터와 디코더 이전 시점의 은닉 상태와 출력이 필요함.

## Seq2seq
- `seq2seq(sequence to seqeunce)`는 입력 시퀀스에 대한 출력 시퀀스를 만들기 위한 모델

In [4]:
# from __future__ import unicode_literals, print_function, divison
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import numpy as np
import pandas as pd
import os

import re
import random

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [16]:
SOS_token = 0
EOS_token = 1
MAX_LENGTH = 20

class Lang :
    def __init__(self) :
        self.word2index = {}
        self.word2count = {}
        self.index2word = {0 : 'SOS' , 1 : 'EOS'}
        self.n_words = 2
        
    def addSentence(self, sentence) :
        for word in sentence.split(' ') :
            self.addWord(word)
            
    def addWord(self, word) :
        if word not in self.word2index :
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
            
        else :
            self.word2count[word] += 1

In [39]:
def normalize_string(df, lang) :
    sent = df[lang].str.lower()
    sent = sent.str.replace('[^A-Za-z\s]+', ' ')
    sent = sent.str.normalize('NFD') # 유니코드 정규화
    sent = sent.str.encode('ascii', errors='ignore').str.decode('utf-8')
    return sent

def read_sentence(df, lang1, lang2) :
    sent1 = normalize_string(df, lang1)
    sent2 = normalize_string(df, lang2)
    return sent1, sent2

def read_file(loc, lang1, lang2) :
    df = pd.read_csv(loc, delimiter='\t', header=None, names=[lang1, lang2, 'etc'])
    return df

def process_data(lang1, lang2) :
    df = read_file(f'./{lang2}-{lang1}/{lang2}.txt', lang1, lang2)
    sent1, sent2 = read_sentence(df, lang1, lang2)
    
    input_lang = Lang()
    output_lang = Lang()
    pairs = []
    
    for i in range(len(df)) :
        if len(sent1[i].split(' ')) < MAX_LENGTH and len(sent2[i].split(' ')) < MAX_LENGTH : 
            full = [sent1[i], sent2[i]]
            input_lang.addSentence(sent1[i])
            output_lang.addSentence(sent2[i])
            pairs.append(full)
            
    return input_lang, output_lang, pairs

In [40]:
# 텐서로 변환
def indexesFromSentence(lang, sentence)  :
    return [lang.word2index[word] for word in sentence.split(' ')]

def tensorFromSentence(lang, sentence) :
    indexes = indexesFromSentence(lang, sentence)
    indexes.append(EOS_token)
    return torch.tensor(indexes, dtype=torch.long, device=device).view(-1, 1)

def tensorsFromPair(input_lang, output_lang, pair) :
    input_tensor = tensorFromSentence(input_lang, pair[0])
    target_tensor = tensorFromSentence(output_lang, pair[1])
    return (input_tensor, target_tensor)

In [65]:
class Encoder(nn.Module) :
    def __init__(self, input_dim, hidden_dim, embed_dim, num_layers) :
        super(Encoder, self).__init__()
        self.input_dim = input_dim # 인코더 입력층
        self.embed_dim = embed_dim # 인코더 임베딩 
        self.hidden_dim = hidden_dim # 인코더 은닉층
        self.num_layers = num_layers # 인코더 GRU 계층수
        self.embedding = nn.Embedding(input_dim, self.embed_dim) # 임베딩 계층 초기화
        self.gru = nn.GRU(self.embed_dim, self.hidden_dim, num_layers=self.num_layers)
        
    def forward(self, src) :
        embed = self.embedding(src).view(1, 1, -1)
        outputs, hidden = self.gru(embed)
        return outputs, hidden

    # 임베딩 계층에서는 출력하기 위해 딕셔너리 조회 테이블을 만들며, GRU 계층은 다음 단어 예측을 위한 확률을 계산함
    # 선형 계층에서는 계산된 확률값 중 최적값(최종 출력 단어) 선택을 위해 소프트맥스 활성화 함수를 사용
    
class Decoder(nn.Module) :
    def __init__(self, output_dim, hidden_dim, embed_dim, num_layers) :
        super(Decoder, self).__init__()
        
        self.embed_dim = embed_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.num_layers = num_layers
        
        self.embedding = nn.Embedding(output_dim, self.embed_dim)
        self.gru = nn.GRU(self.embed_dim, self.hidden_dim, num_layers=self.num_layers)
        self.out = nn.Linear(self.hidden_dim, output_dim)
        self.softmax = nn.LogSoftmax(dim=1)
        
    def forward(self, input, hidden) :
        input = input.view(1, -1)
        embed = F.relu(self.embedding(input))
        output, hidden = self.gru(embed, hidden)
        prediction = self.softmax(self.out(output[0]))
        return prediction, hidden

In [73]:
class Seq2seq(nn.Module) :
    def __init__(self, encoder, decoder, device, max_length=MAX_LENGTH) :
        super().__init__()
        
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
        
    def forward(self, input_lang, output_lang, teacher_forcing_ratio=0.5) :
        input_length = input_lang.size(0)
        batch_size = output_lang.shape[1]
        target_length = output_lang.shape[0]
        vocab_size = self.decoder.output_dim
        outputs = torch.zeros(target_length, batch_size, vocab_size).to(self.device)
        
        for i in range(input_length) :
            encoder_output, encoder_hidden = self.encoder(input_lang[i]) # 문장 모든 단어 인코딩
        decoder_hidden = encoder_hidden.to(device) # 인코더 은닉층을 디코더 은닉층으로 사용
        decoder_input = torch.tensor([SOS_token], device=device) # 첫 예측단어 앞에 토큰(SOS) 추가
        
        for t in range(target_length) : # 현 단어에서 출력 단어 예측
            decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden)
            outputs[t] = decoder_output
            teacher_force = random.random() < teacher_forcing_ratio
            topv, topi = decoder_output.topk(1)
            input = (output_lang[t] if teacher_force else topi)
            if (teacher_force == False and input.item() == EOS_token) :
                break
        
        return outputs

In [74]:
# 오차계산함수 정의
teacher_forcing_ratio = .5

def Model(model, input_tensor, target_tensor, model_optimizer, criterion) :
    model_optimizer.zero_grad()
    input_length = input_tensor.size(0)
    loss = 0
    epoch_loss = 0
    output = model(input_tensor, target_tensor)
    num_iter = output.size(0)
    
    for ot in range(num_iter) :
        loss += criterion(output[ot], target_tensor[ot])
    loss.backward()
    model_optimizer.step()
    epoch_loss = loss.item() / num_iter
    return epoch_loss

In [75]:
# 모델 훈련함수 정의
def trainModel(model, input_lang, output_lang, pairs, num_iter=20000) :
    model.train()
    optimizer = optim.SGD(model.parameters(), lr=0.01)
    criterion = nn.NLLLoss()
    total_loss_iter = 0
    
    training_pairs = [tensorsFromPair(input_lang, output_lang, random.choice(pairs)) for i in range(num_iter)]
    
    for iter in range(1, num_iter+1) :
        training_pair = training_pairs[iter - 1]
        input_tensor = training_pair[0]
        target_tensor = training_pair[1]
        loss = Model(model, input_tensor, target_tensor, optimizer, criterion)
        total_loss_iter += loss
        
        if iter % 5000 == 0 :
            average_loss = total_loss_iter / 5000
            total_loss_iter = 0
            print(f'{iter} | {average_loss:.4f}')
    
    torch.save(model.state_dict(), './custom_model.pt')
    return model

In [86]:
# 모델 평가
def evaluate(model, input_lang, output_lang, sentences, max_length=MAX_LENGTH) :
    with torch.no_grad() :
        input_tensor = tensorFromSentence(input_lang, sentences[0])
        output_tensor = tensorFromSentence(output_lang, sentences[1])
        decoded_words = []
        output = model(input_tensor, output_tensor)
        
        for ot in range(output.size(0)) :
            topv, topi = output[ot].topk(1)
            
            if topi[0].item() == EOS_token :
                decoded_words.append('<EOS>')
                break
            else :
                decoded_words.append(output_lang.index2word[topi[0].item()])
        print(decoded_words)
        return decoded_words
    
    
def evaluateRandomly(model, input_lang, output_lang, pairs, n=10) :
    for i in range(n) :
        pair = random.choice(pairs)
        print(f'input : {pair[0]}')
        print(f'output : {pair[1]}') 
        output_words = evaluate(model, input_lang, output_lang, pair)
        output_sentences = ' '.join(output_words)
        print(f'predicted : {output_sentences}')

In [92]:
# 모델 훈련
lang1 = 'eng'
lang2 = 'fra'

input_lang, output_lang, pairs = process_data(lang1, lang2)

  This is separate from the ipykernel package so we can avoid doing imports until


In [93]:
randomize = random.choice(pairs)
print('random sentence :',randomize)

input_size = input_lang.n_words
output_size = output_lang.n_words
print(f'Input : {input_size} Output : {output_size}')

embed_size = 256
hidden_size = 512
num_layers = 1
num_iteration = 75000

encoder = Encoder(input_size, hidden_size, embed_size, num_layers)
decoder = Decoder(output_size, hidden_size, embed_size, num_layers)

random sentence : ['how many spoonfuls of sugar do you usually put in your tea ', 'combien de cuill res de sucre mettez vous g n ralement dans votre th  ']
Input : 14732 Output : 20929


In [None]:
model = Seq2seq(encoder, decoder, device).to(device)

print(encoder)
print(decoder)

model = trainModel(model, input_lang, output_lang, pairs, num_iteration)

Encoder(
  (embedding): Embedding(14732, 256)
  (gru): GRU(256, 512)
)
Decoder(
  (embedding): Embedding(20929, 256)
  (gru): GRU(256, 512)
  (out): Linear(in_features=512, out_features=20929, bias=True)
  (softmax): LogSoftmax(dim=1)
)


In [None]:
# 임의의 문장 평가
evaluateRandomly(model, input_lang, output_lang, pairs)

---

In [None]:
# 어텐션 적용 디코더
class AttnDecoderRNN(nn.Module) :
    def __init__(self, hidden_size, output_size, dropout_p=0.1, max_length=MAX_LENGTH) :
        super(AttnDecoderRNN, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.dropout_p = dropout_p
        self.max_length = max_length
ㅣ
        self.embedding = nn.Embedding(self.output_size, self.hidden_size)
        self.attn = nn.Linear(self.hidden_size*2, self.max_length)
        self.attn_combine = nn.Linear(self.hidden_size*2, self.hidden_size)
        self.dropout = nn.Dropout(self.dropout_p)
        self.gru = nn.GRU(self.hidden_size, self.hidden_size)
        self.out = nn.Linear(self.hidden_size, self.output_size)
    
    def forward(self, input, hidden, encoder_outputs) :
        embedded = self.embedding(input).view(1, 1, -1)
        embedded = self.dropout(embedded)
        
        attn_weights = F.softmax(self.attn(torch.cat((embedded[0], hidden[0], 1))), dim=1)
        attn_applied = torch.bmm(attn_weights.unsqueeze(0),
                                encoder_outputs.unsqueeze(0))
        output = torch.cat((embedded[0], attn_applied[0]), 1)
        output = self.attn_combine(output).unsqueeze(0)
        
        output = F.relu(output)
        output, hidden = self.gru(output, hidden)
        
        output = F.log_softmax(self.out(output[0]), dim=1)
        return output, hidden, attn_weights        

In [None]:
# 어텐션 디코더 모델 학습
def train_iters(encoder, decoder, n_iters, print_every=1000, plot_every=100, lr=0.01) :
    start = time.time()
    plot_losses = []
    print_loss_total = 0
    plot_loss_total = 0
    
    encoder_optimizer = optim.SGD(encoder.parameters(), lr=lr)
    decoder_optimizer = optim.SGD(encoder.parameters(), lr=lr)
    training_pairs = [tensorsFromPair(input_lang, output_lang, random.choice(pairs)) for i in range(n_iters)]
    criterion = nn.NLLLoss()
    
    for iter in range(1, n_iters + 1) :
        training_pair = training_pairs[iter-1]
        input_tensor = training_pair[0]
        target_tensor = training_pair[1]
        loss = Model(model, input_tensor, target_tensor, decoder_optimizer, criterion)
        print_loss_total += loss
        plot_loss_total += loss
        
        if iter % 5000 == 0 :
            print_loss_avg = print_loss_total // 5000
            print_loss_total = 0
            print(f'{iter} | {print_loss_avg:.4f}')

In [None]:
# 어텐션 디코더 모델 훈련
import time

embed_size =256
hidden_size = 512
num_layers = 1
input_size = input_lang.n_words
output_size = output_lang.n_words

encoder1= Encoder(input_size, hidden_size, embed_size, num_layers)
attn_decoder1 = AttnDecoderRNN(hidden_size, output_size, dropout_p=0.1).to(device)
print(encoder1)
print(attn_decoder1)

attn_model = train_iters(encoder1, attn_decoder1, 75000, print_every=5000,
                        plot_every=100, lr=0.01)

In [None]:
evaluateRandomly(attn_model, input_lang, output_lang, pairs)