번역이나 챗봇에 적용할 수 있는 seq2seq에 대해서 살펴본다.</br>
챗봇을 예로들면 사용자의 문장에 대해서 답변을 예측해서 돌려준다.</br>
기존 RNN은 입력이 시작되는 순간에 어떤 답변을 준비하는데,</br>
seq2seq 모델은 문장을 끝까지 입력받고나서 답변을 만든다.</br>

seq2seq 모델은 인코더 - 디코더 구조로 되어 있다.</br>
1. 입력된 seq를 어떤 벡터로 압축한다.
2. 압축된 벡터를 디코더에 전달한다.
3. 디코더에서는 인코더에서 만든 벡터와 start flag를 첫 셀의 hidden state로 넣어준다.
4. 첫번째 셀에서 나온 output을 문장의 첫번째 단어로 두고 두번째 셀의 입력으로 넣는다.
5. 이를 반복해서 디코더는 최종적으로 하나의 문장을 만든다.

<img src="../../images/seq2seq.png" width="700px" height="300px"/>

In [1]:
import random
import torch
import torch.nn as nn
import torch.optim as optim

torch.manual_seed(0)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

raw = ["I feel hungry.	나는 배가 고프다.",
       "Pytorch is very easy.	파이토치는 매우 쉽다.",
       "Pytorch is a framework for deep learning.	파이토치는 딥러닝을 위한 프레임워크이다.",
       "Pytorch is very clear to use.	파이토치는 사용하기 매우 직관적이다."]

# SOS => start of sentence
# 디코더가 첫번째 쉘의 입력으로 SOS를 받아야 한다.
SOS_token = 0
# EOS => end of sentence
# 문장의 최대 길이보다 작은 문장이 들어올때 EOS로 알려주어야 한다.
EOS_token = 1


class Vocab:
    def __init__(self):
        self.vocab2index = {"<SOS>": SOS_token, "<EOS>": EOS_token}
        self.index2vocab = {SOS_token: "<SOS>", EOS_token: "<EOS>"}
        self.vocab_count = {}
        self.n_vocab = len(self.vocab2index)

    def add_vocab(self, sentence):
        for word in sentence.split(" "):
            if word not in self.vocab2index:
                self.vocab2index[word] = self.n_vocab
                self.vocab_count[word] = 1
                self.index2vocab[self.n_vocab] = word
                self.n_vocab += 1
            else:
                self.vocab_count[word] += 1


def filter_pair(pair, source_max_length, target_max_length):
    return len(pair[0].split(" ")) < source_max_length and len(pair[1].split(" ")) < target_max_length


def preprocess(corpus, source_max_length, target_max_length):
    print("reading corpus...")
    pairs = []
    for line in corpus:
        pairs.append([s for s in line.strip().lower().split("\t")])
    print("Read {} sentence pairs".format(len(pairs)))

    pairs = [pair for pair in pairs if filter_pair(pair, source_max_length, target_max_length)]
    print("Trimmed to {} sentence pairs".format(len(pairs)))

    # 단어의 개수나 사전등의 데이터를 준비해준다.
    source_vocab = Vocab()
    target_vocab = Vocab()

    print("Counting words...")
    for pair in pairs:
        source_vocab.add_vocab(pair[0])
        target_vocab.add_vocab(pair[1])
    print("source vocab size =", source_vocab.n_vocab)
    print("target vocab size =", target_vocab.n_vocab)

    return pairs, source_vocab, target_vocab


class Encoder(nn.Module):
    # input_size 입력 테스트에 있는 단어의 개수이다.
    # hidden_size는 
    def __init__(self, input_size, hidden_size):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        # Embedding 레이어는 input_size 벡터를 hidden_size만큼 차원을 줄여준다.
        # Embedding은 내부적으로 input_size만큼의 원핫 벡터를 만들어서 변환한다.
        self.embedding = nn.Embedding(input_size, hidden_size)
        # GRU에는 차원이 줄여진 벡터를 사용해서 모델을 만든다.
        self.gru = nn.GRU(hidden_size, hidden_size)

    def forward(self, x, hidden):
        x = self.embedding(x).view(1, 1, -1)
        x, hidden = self.gru(x, hidden)
        return x, hidden


class Decoder(nn.Module):
    def __init__(self, hidden_size, output_size):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size)
        # Target Text의 단어들로 복원을 하기 위해서 차원 변환한다.(실제로는 훨씬더 복잡하고 정교하게 한다.)
        self.out = nn.Linear(hidden_size, output_size)
        self.softmax = nn.LogSoftmax(dim=1)

    def forward(self, x, hidden):
        x = self.embedding(x).view(1, 1, -1)
        x, hidden = self.gru(x, hidden)
        x = self.softmax(self.out(x[0]))
        return x, hidden

# 문장을 입력 받아서 문장을 원핫 벡터로 만든고, 이걸 다시 텐서로 바꺼준다.
def tensorize(vocab, sentence):
    indexes = [vocab.vocab2index[word] for word in sentence.split(" ")]
    indexes.append(vocab.vocab2index["<EOS>"])
    return torch.Tensor(indexes).long().to(device).view(-1, 1)


def train(pairs, source_vocab, target_vocab, encoder, decoder, n_iter, print_every=1000, learning_rate=0.01):
    loss_total = 0

    encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)
    decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)

    # 전체 데이터에서 필요한 만큼 랜덤하게 데이터를 추출한다.
    training_batch = [random.choice(pairs) for _ in range(n_iter)]
    # source text와 target text로 나눈다.
    training_source = [tensorize(source_vocab, pair[0]) for pair in training_batch]
    training_target = [tensorize(target_vocab, pair[1]) for pair in training_batch]

    # NLLLoss는 일반적으로 카테고리 값끼리 비교할때 많이 사용하는 loss이다.     
    criterion = nn.NLLLoss()

    for i in range(1, n_iter + 1):
        source_tensor = training_source[i - 1]
        target_tensor = training_target[i - 1]

        # 최초에는 hidden state가 없으므로 0 벡터를 만든다.
        encoder_hidden = torch.zeros([1, 1, encoder.hidden_size]).to(device)

        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()

        source_length = source_tensor.size(0)
        target_length = target_tensor.size(0)

        loss = 0

        # 문장이 끝날때까지 루프를 돌면서 인코더의 hidden state를 꺼내온다.
        for enc_input in range(source_length):
            _, encoder_hidden = encoder(source_tensor[enc_input], encoder_hidden)

        decoder_input = torch.Tensor([[SOS_token]]).long().to(device)
        # 인코더의 마지막 hidden state를 디코터의 hidden state으로 설정한다.
        decoder_hidden = encoder_hidden

        for di in range(target_length):
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
            loss += criterion(decoder_output, target_tensor[di])
            # GRU의 예측값을 다음 셀의 입력으로 사용하지 않고, 이미 알고있는 예측값을 넣어주는 방식을 사용한다.
            decoder_input = target_tensor[di]  # teacher forcing

        loss.backward()

        encoder_optimizer.step()
        decoder_optimizer.step()

        loss_iter = loss.item() / target_length
        loss_total += loss_iter

        if i % print_every == 0:
            loss_avg = loss_total / print_every
            loss_total = 0
            print("[{} - {}%] loss = {:05.4f}".format(i, i / n_iter * 100, loss_avg))


def evaluate(pairs, source_vocab, target_vocab, encoder, decoder, target_max_length):
    for pair in pairs:
        print(">", pair[0])
        print("=", pair[1])
        source_tensor = tensorize(source_vocab, pair[0])
        source_length = source_tensor.size()[0]
        encoder_hidden = torch.zeros([1, 1, encoder.hidden_size]).to(device)

        for ei in range(source_length):
            _, encoder_hidden = encoder(source_tensor[ei], encoder_hidden)

        decoder_input = torch.Tensor([[SOS_token]], device=device).long()
        decoder_hidden = encoder_hidden
        decoded_words = []

        for di in range(target_max_length):
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
            _, top_index = decoder_output.data.topk(1)
            if top_index.item() == EOS_token:
                decoded_words.append("<EOS>")
                break
            else:
                decoded_words.append(target_vocab.index2vocab[top_index.item()])

            decoder_input = top_index.squeeze().detach()

        predict_words = decoded_words
        predict_sentence = " ".join(predict_words)
        print("<", predict_sentence)
        print("")

# 번역 task를 수행하는 seq2seq를 구현한 모델

SOURCE_MAX_LENGTH = 10  # 번역할 영어문장의 최대 길이 제한
TARGET_MAX_LENGTH = 12  # 번역한 한글문장의 최대 길이 제한

# preprocess 함수는 source: 영어 -> target: 한국어 데이터셋을 학습셋과 테스트셋으로 나눈다.
load_pairs, load_source_vocab, load_target_vocab = preprocess(raw, SOURCE_MAX_LENGTH, TARGET_MAX_LENGTH)
print(random.choice(load_pairs))

# 인코더와 디코더의 hidden state 크기를 같은 크기로 정의한다.
enc_hidden_size = 16
dec_hidden_size = enc_hidden_size

# Encoder와 Decoder라는 RNN 모델을 선언한다.
enc = Encoder(load_source_vocab.n_vocab, enc_hidden_size).to(device)
dec = Decoder(dec_hidden_size, load_target_vocab.n_vocab).to(device)

# train 함수는 Encoder의 출력을 Decoder의 첫입력으로 연결해서 학습한다.
train(load_pairs, load_source_vocab, load_target_vocab, enc, dec, 5000, print_every=1000)
# evaluate 함수는 학습이 완료된 것을 평가한다.
evaluate(load_pairs, load_source_vocab, load_target_vocab, enc, dec, TARGET_MAX_LENGTH)


reading corpus...
Read 4 sentence pairs
Trimmed to 4 sentence pairs
Counting words...
source vocab size = 17
target vocab size = 13
['i feel hungry.', '나는 배가 고프다.']
[1000 - 20.0%] loss = 0.7369
[2000 - 40.0%] loss = 0.1067
[3000 - 60.0%] loss = 0.0334
[4000 - 80.0%] loss = 0.0183
[5000 - 100.0%] loss = 0.0126
> i feel hungry.
= 나는 배가 고프다.
< 나는 배가 고프다. <EOS>

> pytorch is very easy.
= 파이토치는 매우 쉽다.
< 파이토치는 매우 쉽다. <EOS>

> pytorch is a framework for deep learning.
= 파이토치는 딥러닝을 위한 프레임워크이다.
< 파이토치는 딥러닝을 위한 프레임워크이다. <EOS>

> pytorch is very clear to use.
= 파이토치는 사용하기 매우 직관적이다.
< 파이토치는 사용하기 매우 직관적이다. <EOS>

