**16-01 시퀀스투시퀀스**

**16-02 Seq2Seq를 이용한 번역기 구현하기**

In [1]:
#데이터 로드 및 전처리

#import
import re
import os
import unicodedata
import urllib3
import zipfile
import shutil
import numpy as np
import pandas as pd
import torch
from collections import Counter
from tqdm import tqdm
from torch.utils.data import DataLoader, TensorDataset

In [2]:
num_samples = 33000

In [3]:
#전처리 - 구두점 제거 / 단어와 구분
def unicode_to_ascii(s):
    return ''.join(c for c in unicodedata.normalize('NFD', s) if unicodedata.category(c) != 'Mn')

In [4]:
def preprocess_sentence(sent):
    sent = unicode_to_ascii(sent.lower())

    #단어와 구두점 사이에 공백
    sent= re.sub(r"([?.!,¿])", r" \1", sent)

    #문자 제외 공백으로 변환
    sent = re.sub(r"[^a-zA-Z!.?]+", r" ",sent)

    #다수의 공백을 하나의 공백으로
    sent = re.sub(r"\s+", " ",sent)
    return sent

In [5]:
def load_preprocessed_data():
    encoder_input, decoder_input, decoder_target = [], [], []

    with open('fra.txt', 'r') as lines:
        for i, line in enumerate(lines):
            # source 데이터와 target 데이터 분리
            src_line, tar_line, _ = line. strip().split('\t')

            #source 데이터 전처리
            src_line = [w for w in preprocess_sentence(src_line).split()]

            #target 데이터 전처리
            tar_line = preprocess_sentence(tar_line)
            tar_line_in = [w for w in ("<sos> " + tar_line).split()]
            tar_line_out = [w for w in (tar_line + " <eos>").split()]

            encoder_input.append(src_line)
            decoder_input.append(tar_line_in)
            decoder_target.append(tar_line_out)

            if i == num_samples - 1:
                break
        return encoder_input, decoder_input, decoder_target



In [6]:
# 전처리 테스트
en_sent = u"Have you had dinner?"
fr_sent = u"Avez-vous déjà diné?"

print('전처리 전 영어 문장 :', en_sent)
print('전처리 후 영어 문장 :',preprocess_sentence(en_sent))
print('전처리 전 프랑스어 문장 :', fr_sent)
print('전처리 후 프랑스어 문장 :', preprocess_sentence(fr_sent))


전처리 전 영어 문장 : Have you had dinner?
전처리 후 영어 문장 : have you had dinner ?
전처리 전 프랑스어 문장 : Avez-vous déjà diné?
전처리 후 프랑스어 문장 : avez vous deja dine ?


In [7]:
sents_en_in, sents_fra_in, sents_fra_out = load_preprocessed_data()


In [8]:
sents_en_in, sents_fra_in, sents_fra_out = load_preprocessed_data()
print('인코더의 입력 :',sents_en_in[:5])
print('디코더의 입력 :',sents_fra_in[:5])
print('디코더의 레이블 :',sents_fra_out[:5])


인코더의 입력 : [['go', '.'], ['go', '.'], ['go', '.'], ['go', '.'], ['hi', '.']]
디코더의 입력 : [['<sos>', 'va', '!'], ['<sos>', 'marche', '.'], ['<sos>', 'en', 'route', '!'], ['<sos>', 'bouge', '!'], ['<sos>', 'salut', '!']]
디코더의 레이블 : [['va', '!', '<eos>'], ['marche', '.', '<eos>'], ['en', 'route', '!', '<eos>'], ['bouge', '!', '<eos>'], ['salut', '!', '<eos>']]


In [9]:
#vocab
def build_vocab(sents):
    word_list = []

    for sent in sents:
        for word in sent:
            word_list.append(word)

    #빈도 순 정렬
    word_counts = Counter(word_list)
    vocab = sorted(word_counts, key=word_counts.get, reverse = True)

    word_to_index = {}
    word_to_index['<PAD>'] = 0
    word_to_index['<UNK>'] = 1

    #빈도가 높을 수록 낮은 정수
    for index, word in enumerate(vocab):
        word_to_index[word] = index +2

    return word_to_index

In [10]:
#영어 단어 집합(src) / 프랑스어 단어 집합(tar)
src_vocab = build_vocab(sents_en_in)
tar_vocab = build_vocab(sents_fra_in + sents_fra_out)

src_vocab_size = len(src_vocab)
tar_vocab_size = len(tar_vocab)
print("영어 단어 집합의 크기 : {:d}, 프랑스어 단어 집합의 크기 : {:d}".format(src_vocab_size, tar_vocab_size))


영어 단어 집합의 크기 : 4488, 프랑스어 단어 집합의 크기 : 7884


In [11]:
#정수로부터 단어를 얻는 딕셔너리를 각각 생성
index_to_src = {v: k for k, v in src_vocab.items()}
index_to_tar = {v: k for k, v in tar_vocab.items()}

def texts_to_sequences(sents, word_to_index):
    encoded_X_data = []
    for sent in tqdm(sents):
        index_sequences = []
        for word in sent:
            try:
                index_sequences. append(word_to_index[word])
            except KeyError:
                index_sequences.append(word_to_index['<UNK>'])
        encoded_X_data.append(index_sequences)
    return encoded_X_data

In [12]:
encoder_input = texts_to_sequences(sents_en_in, src_vocab)
decoder_input = texts_to_sequences(sents_fra_in, tar_vocab)
decoder_target = texts_to_sequences(sents_fra_out, tar_vocab)


100%|██████████| 33000/33000 [00:00<00:00, 316936.86it/s]
100%|██████████| 33000/33000 [00:00<00:00, 301722.18it/s]
100%|██████████| 33000/33000 [00:00<00:00, 88761.62it/s]


In [13]:
# 상위 5개의 샘플에 대해서 정수 인코딩 전, 후 문장 출력
# 인코더 입력이므로 <sos>나 <eos>가 없음
for i, (item1, item2) in zip(range(5), zip(sents_en_in, encoder_input)):
    print(f"Index: {i}, 정수 인코딩 전: {item1}, 정수 인코딩 후: {item2}")


Index: 0, 정수 인코딩 전: ['go', '.'], 정수 인코딩 후: [27, 2]
Index: 1, 정수 인코딩 전: ['go', '.'], 정수 인코딩 후: [27, 2]
Index: 2, 정수 인코딩 전: ['go', '.'], 정수 인코딩 후: [27, 2]
Index: 3, 정수 인코딩 전: ['go', '.'], 정수 인코딩 후: [27, 2]
Index: 4, 정수 인코딩 전: ['hi', '.'], 정수 인코딩 후: [742, 2]


In [14]:
#패딩
def pad_sequences(sentences, max_len=None):
    # 최대 길이 값이 주어지지 않을 경우 데이터 내 최대 길이로 패딩
    if max_len is None:
        max_len = max([len(sentence) for sentence in sentences])

    features = np.zeros((len(sentences), max_len), dtype=int)
    for index, sentence in enumerate(sentences):
        if len(sentence) != 0:
            features[index, :len(sentence)] = np.array(sentence)[:max_len]
    return features


In [15]:
encoder_input = pad_sequences(encoder_input)
decoder_input = pad_sequences(decoder_input)
decoder_target = pad_sequences(decoder_target)


In [16]:
#크기 확인
print('인코더의 입력의 크기(shape) :',encoder_input.shape)
print('디코더의 입력의 크기(shape) :',decoder_input.shape)
print('디코더의 레이블의 크기(shape) :',decoder_target.shape)


인코더의 입력의 크기(shape) : (33000, 7)
디코더의 입력의 크기(shape) : (33000, 16)
디코더의 레이블의 크기(shape) : (33000, 16)


In [17]:
#분리하기 전 데이터를 섞기
indices = np.arange(encoder_input.shape[0])
np.random.shuffle(indices)
print('랜덤 시퀀스 :',indices)


랜덤 시퀀스 : [ 2716 26862 21398 ... 28650 15084  9690]


In [18]:
encoder_input = encoder_input[indices]
decoder_input = decoder_input[indices]
decoder_target = decoder_target[indices]

In [19]:
print([index_to_src[word] for word in encoder_input[30997]])
print([index_to_tar[word] for word in decoder_input[30997]])
print([index_to_tar[word] for word in decoder_target[30997]])


['she', 'has', 'dry', 'hair', '.', '<PAD>', '<PAD>']
['<sos>', 'elle', 'a', 'une', 'chevelure', 'seche', '.', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>']
['elle', 'a', 'une', 'chevelure', 'seche', '.', '<eos>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>', '<PAD>']


In [20]:
#10퍼센트를 테스트 데이터
n_of_val = int(33000*0.1)
print('검증 데이터의 개수 :',n_of_val)


검증 데이터의 개수 : 3300


In [21]:
encoder_input_train = encoder_input[:-n_of_val]
decoder_input_train = decoder_input[:-n_of_val]
decoder_target_train = decoder_target[:-n_of_val]

encoder_input_test = encoder_input[-n_of_val:]
decoder_input_test = decoder_input[-n_of_val:]
decoder_target_test = decoder_target[-n_of_val:]

In [22]:
print('훈련 source 데이터의 크기 :',encoder_input_train.shape)
print('훈련 target 데이터의 크기 :',decoder_input_train.shape)
print('훈련 target 레이블의 크기 :',decoder_target_train.shape)
print('테스트 source 데이터의 크기 :',encoder_input_test.shape)
print('테스트 target 데이터의 크기 :',decoder_input_test.shape)
print('테스트 target 레이블의 크기 :',decoder_target_test.shape)


훈련 source 데이터의 크기 : (29700, 7)
훈련 target 데이터의 크기 : (29700, 16)
훈련 target 레이블의 크기 : (29700, 16)
테스트 source 데이터의 크기 : (3300, 7)
테스트 target 데이터의 크기 : (3300, 16)
테스트 target 레이블의 크기 : (3300, 16)


In [23]:
#기계 번역기 만들기
import torch
import torch.nn as nn
import torch.optim as optim

embedding_dim = 256
hidden_units = 256

class Encoder(nn.Module):
    def __init__(self, src_vocab_size, embedding_dim, hidden_units):
        super(Encoder, self).__init()
        self.embedding = nn.Embedding(src_vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_units, batch_first=True)

    def forward(self, x):
        x = self.embedding(x)
        _, (hidden, cell) = self.lstm(x)
        return hidden, cell

class Decoder(nn.Module):
    def __init__(self, tar_vocab_size, embedding_dim, hidden_units):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(tar_vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_units, batch_first = True)
        self.fc = nn.Liinear(hidden_units, tar_vocab_size)

    def forward(self, x, hidden, cell):
