# 전처리

텍스트 파일을 읽고, 영어와 프랑스어를 분리하여 토큰화한다.

이후 전처리 과정을 통해서 인덱스를 부여한다.

다음과 같은 특수한 인덱스 값이 있음에 주의한다.

`UNK` 0

`PAD` 1

`SOS` 2

`EOS` 3

In [16]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import time
import nltk.translate.bleu_score as bleu
import random

In [21]:
#####################################################
# Sung Hyeon Kim's Code (Assisted by Bong Won Jang)
# - 2020 06 15 22:28 ☑️
#####################################################

def tokenize(path_name):

    with open(path_name, 'r', encoding='utf-8') as f:
            lines = f.read().split('\n')

    source_texts = []
    target_texts = []
    target_labels = []

    for line in lines[:100]:
        if not line:
            break
        source_text, target_text = line.split('\t')
        source_text = source_text.strip() 
        target_text = target_text.strip()
                                                                        # -----------Example-----------
        encoder_input = source_text.split()                             # ['come', 'on', '!']
        decoder_input = ("<sos> " + target_text + " <eos>").split()     # ['<sos>', 'allez', '!', '<eos>']
        target_label = (target_text + " <eos>").split()                 # ['allez', '!', '<eos>']

        source_texts.append(encoder_input)
        target_texts.append(decoder_input)
        target_labels.append(target_label)

    return source_texts, target_texts, target_labels

def preprocess(tokenize_texts):
    word2index = {}
    index2word = {}

    #################################################
    # add unk, pad, sos, eos to dictionary in advance
    #################################################

    # word2index
    word2index['<unk>'] = 0
    word2index['<pad>'] = 1
    word2index['<sos>'] = 2
    word2index['<eos>'] = 3
    
    #index2word
    index2word = {v: k for k, v in word2index.items()}

    #################################################
    # add other words to dictionary
    #################################################
    n_word = 4
    for text in tokenize_texts:
        for word in text:
            if word not in word2index:
                word2index[word] = n_word
                index2word[n_word] = word
                n_word += 1

    return word2index, index2word

def wordtext2indtext(word_texts, word2ind):
    ind_texts = []

    for word_text in word_texts:
        temp_ind_text = []
        for word in word_text:
            temp_ind_text.append(word2ind[word])

        ind_texts.append(temp_ind_text)
        
    return ind_texts

# 모델

LSTM 셀을 사용한 Attention으로 `Encoder`, `Decoder`를 구현한다.

In [6]:
############################################
#   Encoder
#
#   Encoder for seq2seq model with attention mechanism
#   This Encoder is based on a LSTM structure
############################################
class Encoder(nn.Module):

    ############################################
    #   __init__
    #   
    #   <parameters>
    #   - input_size    : the size of input word vocabulary (영어 단어 사전 크기)
    #   - hidden_size   : the size of hidden vector and cell vector
    ############################################
    def __init__(self, input_size, hidden_size):
        super(Encoder, self).__init__()

        self.input_size = input_size                                                # scalar : We
        self.hidden_size = hidden_size                                              # scalar : h
        self.cell_size = hidden_size                                                # scalar : h

        self.embedding_matrix = nn.Embedding(self.input_size, self.hidden_size)     # matrix : (We * h)
        self.lstm = nn.LSTM(self.hidden_size, self.hidden_size)

    ############################################
    #   forward
    #   
    #   <parameters>
    #   - word_num  : the integer number of a word (영어 단어 번호)
    #   - hidden    : hidden vector (h_0 is zero vector)
    #   - cell      : cell vector   (c_0 is zero vector)
    #
    #   <return>
    #   - o         : output vector
    #   - hn        : next hidden vector   
    #   - cn        : next cell vector
    ############################################
    def forward(self, word_num, hidden, cell):
        embedding_vector = self.embedding_matrix.weight[word_num].view(1, 1, -1)            #    matrix : (1 * 1 * h)
        o, (hn, cn) = self.lstm(embedding_vector, (hidden, cell))                           #  o matrix : (1 * 1 * h)
                                                                                            # hn matrix : (1 * 1 * h)
                                                                                            # cn matrix : (1 * 1 * h)
        return o, (hn, cn)

    ############################################
    #   initHidden
    #   
    #   <parameters>
    #   - device     : the integer number of a word
    #
    #   <return>
    #   - initial hidden vector : zero vector
    #
    #   아직 Pytorch 문법에서 3차원으로 구성해야 하는 이유를 모르겠습니다.
    ############################################
    def initHidden(self, device):
        return torch.zeros(1, 1, self.hidden_size, device=device)

    ############################################
    #   initCell
    #   
    #   <parameters>
    #   - device     : the integer number of a word
    #
    #   <return>
    #   - initial cell vector : zero vector
    #
    #   아직 Pytorch 문법에서 3차원으로 구성해야 하는 이유를 모르겠습니다.
    ############################################
    def initCell(self, device):
        return torch.zeros(1, 1, self.cell_size, device=device)

############################################
#   Decoder
#
#   Decoder for seq2seq model with attention mechanism
#   This Decoder is based on a LSTM structure
############################################
class Decoder(nn.Module):
    
    ############################################
    #   __init__
    #   
    #   <parameters>
    #   - output_size   : the size of output word vocabulary (프랑스어 단어 사전 크기)
    #   - hidden_size   : the size of hidden vector
    #   - max_length    : the max length of output sentence
    ############################################
    def __init__(self, output_size, hidden_size):
        super(Decoder, self).__init__()

        self.output_size = output_size                                              # scalar : Wd
        self.hidden_size = hidden_size                                              # scalar : h
        self.cell_size = hidden_size                                                # scalar : h
        
        self.embedding_matrix = nn.Embedding(self.output_size, self.hidden_size)    # matrix : (Wd * h)
        self.lstm = nn.LSTM(hidden_size, hidden_size)

        self.out_linear = nn.Linear(self.hidden_size * 2, self.output_size)         # eq : (1 * Wd) = (1 * 2h) x (2h * Wd)

    ############################################
    #   forward
    #   
    #   <parameters>                                                       <size>
    #   - word_num  : the integer number of a word (프랑스 단어 번호)    :  scalar
    #   - hidden    : hidden vector (h_0 is zero vector)                :  h 
    #   - cell      : cell vector   (c_0 is zero vector)                :  h
    #   - hs        : pile of all hidden vector from encoder            :  (N * h)
    #
    #   <return>
    #   - o         : output vector
    #   - hn        : next hidden vector   
    #   - cn        : next cell vector
    ############################################
    def forward(self, word_num, hidden, cell, hs):
        embedding_vector = self.embedding_matrix(word_num).view(1, 1, -1)       # matrix : (1 * 1 * h)
        o, (hn, cn) = self.lstm(embedding_vector, (hidden, cell))               #  o matrix : (1 * 1 * h)
                                                                                # hn matrix : (1 * 1 * h)
                                                                                # cn matrix : (1 * 1 * h)               

        attn_score = torch.mm(hs, hn.view(-1, 1)).view(1, -1)                   # (1 * N) = (N * h) x (h * 1) 
        attn_distr = F.softmax(attn_score)                                      # (1 * N) = softmax(1 * N)
        attn_output = torch.mm(attn_distr, hs)                                  # (1 * h) = (1 * N) x (N * h)

        y = F.softmax(self.out_linear(torch.cat((attn_output, hn.view(1, -1)), dim=1))) # (1 * output_size)
                                                                                        # = softmax{ (1 * 2h) x (2h * Wd) }
        return y, (hn, cn), attn_distr

    ############################################
    #   initHidden
    #   
    #   <parameters>
    #   - device     : the integer number of a word
    #
    #   <return>
    #   - initial hidden vector : zero vector
    #
    #   아직 Pytorch 문법에서 3차원으로 구성해야 하는 이유를 모르겠습니다.
    ############################################
    def initHidden(self, device):
        return torch.zeros(1, 1, self.hidden_size, device=device)
        
    ############################################
    #   initCell
    #   
    #   <parameters>
    #   - device     : the integer number of a word
    #
    #   <return>
    #   - initial cell vector : zero vector
    #
    #   아직 Pytorch 문법에서 3차원으로 구성해야 하는 이유를 모르겠습니다.
    ############################################
    def initCell(self, device):
        return torch.zeros(1, 1, self.cell_size, device=device)

# 학습

앞서 구현한 모델을 이용한 학습을 실현한다.

In [None]:
def train(input_tensor, target_tensor, encoder, decoder, encoder_optimizer, decoder_optimizer, criterion, max_length):
    ############################################
    #   train (Encoder, Decoder에 맞게 구현해야 함)
    #   
    #   <parameters>
    #   - input_tensor  
    #   - target_tensor  
    #   - encoder       : Encoder 모듈
    #   - decoder       : Decoder 모듈
    #   - encoder       : Encoder Optim (SGD)
    #   - decoder       : Decoder Optim (SGD)  
    #   - criterion     : Loss 계산 (NLLLoss) 
    #   - max_length    : 문장의 최대 길이
    #
    #   <return>
    #   - encoder       : Encoder 모듈
    #   - decoder       : Decoder 모듈  
    #   - loss          : loss
    ############################################
    SOS_token = 2
    EOS_token = 3
    
    encoder_hidden = encoder.initHidden()

    encoder_optimizer.zero_grad()
    decoder_optimizer.zero_grad()

    input_length = input_tensor.size(0)
    target_length = target_tensor.size(0)

    encoder_outputs = torch.zeros(max_length, encoder.hidden_size, device=device)

    loss = 0

    for ei in range(input_length):
        encoder_output, encoder_hidden = encoder(input_tensor[ei], encoder_hidden)
        encoder_outputs[ei] = encoder_output[0, 0]

    decoder_input = torch.tensor([[SOS_token]], device=device)
    decoder_hidden = encoder_hidden

    
    for di in range(target_length):
        decoder_output, decoder_hidden, decoder_attention = decoder(decoder_input, decoder_hidden, encoder_outputs)
        topv, topi = decoder_output.topk(1)
        decoder_input = topi.squeeze().detach()

        loss += criterion(decoder_output, target_tensor[di])
        if decoder_input.item() == EOS_token:
            break

    loss.backward()

    encoder_optimizer.step()
    decoder_optimizer.step()

    return encoder, decoder, loss.item() / target_length

In [None]:
def LSTM_trainer(source_size, target_size, hidden_size=256, source_ind_texts, target_ind_texts, target_ind_labels, max_length, iteration=1000, learning_rate=0.01):
    ############################################
    #   LSTM_trainer
    #   
    #   <parameters>
    #   - source_size       : 영어 사전 크기
    #   - target_size       : 프랑스어 사전 크기
    #   - hidden_size       : 은닉 레이어 크기
    #   - source_ind_texts  : index로 변환한 영어 문장
    #   - target_ind_texts  : index로 변환한 프랑스어 문장
    #   - target_ind_labels : index로 변환한 프랑스어 문장 레이블
    #   - max_length        : index로 변환한 영어 또는 프랑스어 문장의 최대 길이
    #   - iteration         : iteration 횟수 (기본값 1000)
    #   - learning_rate     : learning rate (기본값 0.01)  
    #
    #   <return>
    #   - encoder           : Encoder 모듈
    #   - decoder           : Decoder 모듈  
    ############################################

    encoder = Encoder(source_size, hidden_size)
    decoder = Decoder(target_size, hidden_size)
    start = time.time()
    print_loss_total = 0
    pairs = [(torch.tensor(s, dtype=torch.long, device=device).view(-1, 1), \
              torch.tensor(t, dtype=torch.long, device=device).view(-1, 1)) \
             for s, t in zip(source_ind_texts, target_ind_labels)]

    encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate)
    decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate)
    training_pairs = [random.choice(pairs) for i in range(iteration)]
    criterion = nn.NLLLoss()
    
    for i in range(iteration):
        training_pair = training_pairs[i]
        input_tensor = training_pair[0]
        target_tensor = training_pair[1]

        encoder, decoder, loss = train(input_tensor, target_tensor, encoder,
                     decoder, encoder_optimizer, decoder_optimizer, criterion, max_length)
        print_loss_total += loss

        if iter % 100 == 0:
            print_loss_avg = print_loss_total / 100
            print_loss_total = 0
            print("{} / {}\n소요 시간: {}\n진행률: {}%\n 평균 손실: {}".format(iter, iteration, time.time() - start, iter / iteration * 100, print_loss_avg))
            
    return encoder, decoder

# 평가

학습이 잘 되었는지 `bleu score`을 이용해 평가한다.

In [24]:
def evaluate(encoder, decoder, sentence, max_length):
    pass

In [25]:
def random_evaluate(encoder, decoder):
    pass

In [20]:
hyp1 = ['It', 'is', 'a', 'guide', 'to', 'action', 'which', 'ensures', 'that', 'the', 'military', 'always', 'obeys', 'the', 'commands', 'of', 'the', 'party']
ref1a = ['It', 'is', 'a', 'guide', 'to', 'action', 'that', 'ensures', 'that', 'the', 'military', 'will', 'forever', 'heed', 'Party', 'commands']
ref1b = ['It', 'is', 'the', 'guiding', 'principle', 'which', 'guarantees', 'the', 'military', 'forces', 'always', 'being', 'under', 'the', 'command', 'of', 'the', 'Party']
ref1c = ['It', 'is', 'the', 'practical', 'guide', 'for', 'the', 'army', 'always', 'to', 'heed', 'the', 'directions', 'of', 'the', 'party']
hyp2 = ['he', 'read', 'the', 'book', 'because', 'he', 'was', 'interested', 'in', 'world', 'history']
ref2a = ['he', 'was', 'interested', 'in', 'world', 'history', 'because', 'he', 'read', 'the', 'book']

bleu.corpus_bleu([[ref1a, ref1b], [ref2a]], [hyp1, hyp2])    

0.5673740569387403

# 메인

In [22]:
def main():
    source_texts, target_texts, target_labels = tokenize('data/eng-fra_train.txt')
    
    for eng, fra , label_fra in zip(source_texts, target_texts, target_labels):
        print("(1) ENG :", eng, "\n(2) FRA :", fra, "\n(3) LABEL FRA :", label_fra, "\n")
    

    source_word2index, source_index2word = preprocess(source_texts)
    target_word2index, target_index2word = preprocess(target_texts)
    source_size = len(source_word2index)
    target_size = len(target_word2index)

    print("------------------------------------")
    print("SIZE source_word2index : ", source_size)
    print(list(source_word2index.items()))
    print(list(source_index2word.items()))

    print("------------------------------------")
    print("SIZE target_word2index : ", target_size)
    print(list(target_word2index.items()))
    print(list(target_index2word.items()))

    source_ind_texts = wordtext2indtext(source_texts, source_word2index)
    target_ind_texts = wordtext2indtext(target_texts, target_word2index)
    target_ind_labels = wordtext2indtext(target_labels, target_word2index)
    
    
    for eng, fra , label_fra in zip(source_ind_texts, target_ind_texts, target_ind_labels):
        print("(1) ENG :", eng, "\n(2) FRA :", fra, "\n(3) LABEL FRA :", label_fra, "\n")
    
    
    source_max_length = max([len(each) for each in source_ind_texts])
    target_max_length = max([len(each) for each in target_ind_texts])
    max_length = max(source_max_length, target_max_length)
    print("------------------------------------")
    print("SOURCE WORD2INDEX MAX LENGTH : ", source_max_length)
    print("TARGET WORD2INDEX MAX LENGTH : ", target_max_length)
    
    # encoder, decoder = LSTM_trainer(source_size, target_size, hidden_size=256, source_ind_texts, target_ind_texts, target_ind_labels, max_length)
    # random_evaluate(encoder, decoder)

main()

(1) ENG : ['go', '.'] 
(2) FRA : ['<sos>', 'va', '!', '<eos>'] 
(3) LABEL FRA : ['va', '!', '<eos>'] 

(1) ENG : ['help', '!'] 
(2) FRA : ['<sos>', 'a', 'l', 'aide', '!', '<eos>'] 
(3) LABEL FRA : ['a', 'l', 'aide', '!', '<eos>'] 

(1) ENG : ['stop', '!'] 
(2) FRA : ['<sos>', 'ca', 'suffit', '!', '<eos>'] 
(3) LABEL FRA : ['ca', 'suffit', '!', '<eos>'] 

(1) ENG : ['stop', '!'] 
(2) FRA : ['<sos>', 'stop', '!', '<eos>'] 
(3) LABEL FRA : ['stop', '!', '<eos>'] 

(1) ENG : ['stop', '!'] 
(2) FRA : ['<sos>', 'arrete', 'toi', '!', '<eos>'] 
(3) LABEL FRA : ['arrete', 'toi', '!', '<eos>'] 

(1) ENG : ['go', 'on', '.'] 
(2) FRA : ['<sos>', 'poursuis', '.', '<eos>'] 
(3) LABEL FRA : ['poursuis', '.', '<eos>'] 

(1) ENG : ['go', 'on', '.'] 
(2) FRA : ['<sos>', 'continuez', '.', '<eos>'] 
(3) LABEL FRA : ['continuez', '.', '<eos>'] 

(1) ENG : ['go', 'on', '.'] 
(2) FRA : ['<sos>', 'poursuivez', '.', '<eos>'] 
(3) LABEL FRA : ['poursuivez', '.', '<eos>'] 

(1) ENG : ['i', 'see', '.'] 
(2) FRA :