# seq2seq+attn 스페인-영어 번역기

In [None]:
import tensorflow as tf
import numpy as np
from sklearn.model_selection import train_test_split

import matplotlib.ticker as ticker
import matplotlib.pyplot as plt

import time
import re
import os
import io

## 데이터 불러오기

In [None]:
path_to_zip = tf.keras.utils.get_file('spa-eng.zip', origin='http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip', extract=True)

Downloading data from http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip


In [None]:
path_to_file = os.path.dirname(path_to_zip) + "/spa-eng/spa.txt"

In [None]:
with open(path_to_file, "r") as f:
    raw = f.read().splitlines()

print("Data Size : ", len(raw))
print("Example : ")

for sen in raw[0:100][::20]: print(">>", sen)

Data Size :  118964
Example : 
>> Go.	Ve.
>> Wait.	Esperen.
>> Hug me.	Abrázame.
>> No way!	¡Ni cagando!
>> Call me.	Llamame.


\t 

## 데이터 전처리 : 정제하기

In [None]:
def preprocess_sentence(sentence, s_token=False, e_token=False):
    sentence = sentence.lower().strip()

    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
    sentence = re.sub(r'[" "]+', " ", sentence)
    sentence = re.sub(r"[^a-zA-Z?.!,]+", " ", sentence)

    sentence = sentence.strip()

    if s_token:
        sentence = '<start> ' + sentence
    
    if e_token:
        sentence += ' <end>'

    return sentence

In [None]:
enc_corpus = []
dec_corpus = []

num_examples = 30000

for pair in raw[:num_examples]:
    eng, spa = pair.split("\t")

    enc_corpus.append(preprocess_sentence(eng))
    dec_corpus.append(preprocess_sentence(spa, s_token=True, e_token=True))

print("English :", enc_corpus[100]) # 영어 텍스트
print("Spanish :", dec_corpus[100]) # 스페인 텍스트

English : go away !
Spanish : <start> salga de aqu ! <end>


## 데이터 전처리 : 토큰화

In [None]:
def tokenize(corpus):
    tokenizer = tf.keras.preprocessing.text.Tokenizer(filters='')
    tokenizer.fit_on_texts(corpus)

    tensor = tokenizer.texts_to_sequences(corpus)

    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='post')

    return tensor, tokenizer

In [None]:
# 토큰화
enc_tensor, enc_tokenizer = tokenize(enc_corpus)
dec_tensor, dec_tokenizer = tokenize(dec_corpus)

# 훈련 데이터와 검증 데이터로 분리하기 (80/20) ---> test_size = 0.2
enc_train, enc_val, dec_train, dec_val = train_test_split(enc_tensor, dec_tensor, test_size=0.2)

print("English Vocab Size : ", len(enc_tokenizer.index_word))
print("Spanish Vocab Size : ", len(dec_tokenizer.index_word))

English Vocab Size :  4931
Spanish Vocab Size :  8893


# 모델 설계

![](https://aiffelstaticprd.blob.core.windows.net/media/images/GN-4-P-2.max-800x600.jpg)

In [None]:
class BahdanauAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.w_decoder = tf.keras.layers.Dense(units)
        self.w_encoder = tf.keras.layers.Dense(units)
        self.w_combine = tf.keras.layers.Dense(1)
    
    def call(self, h_encoder, h_decoder):
        # h_enc shape : [batch x length x units]
        # h_dec shape : [batch x units]

        h_encoder = self.w_encoder(h_encoder)

        h_decoder = tf.expand_dims(h_decoder, 1)
        h_decoder = self.w_decoder(h_decoder)

        score = self.w_combine(tf.nn.tanh(h_decoder + h_encoder))

        attention_weights = tf.nn.softmax(score, axis=1)

        context_vector = attention_weights * h_decoder
        context_vector = tf.reduce_sum(context_vector, axis = 1)

        return context_vector, attention_weights

In [None]:
class Encoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, enc_units):
        super(Encoder, self).__init__()
        self.enc_units = enc_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(enc_units, return_sequences=True)

    def call(self, x):
        out = self.embedding(x)
        out = self.gru(out)

        return out

In [25]:
class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, dec_units):
        super(Decoder, self).__init__()
        self.dec_units = dec_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(dec_units, return_sequences=True, return_state=True)
        self.fc = tf.keras.layers.Dense(vocab_size)
        self.attention = BahdanauAttention(self.dec_units)

    def call(self, x, h_dec, enc_out):
        context_vec, attn = self.attention(enc_out, h_dec)
        #print("컨텍스트 벡터의 차원 : ", context_vec.shape) #(64, 1024)
    
        out = self.embedding(x)
        out = tf.concat([tf.expand_dims(context_vec, 1), out], axis= -1)

        out, h_dec = self.gru(out)
        #print("gru 차원 : ", out.shape) #(64, 1, 1024)
        
        out = tf.reshape(out, (-1, out.shape[2])) # (64, 1024)
        #print("변경된 out의 차원 : ", out.shape)
        out = self.fc(out) 
        #print("fc 거친 out의 차원 :", out.shape) # (64, 8894)

        return out, h_dec, attn

In [26]:
batch_size = 64
src_vocab_size = len(enc_tokenizer.index_word) + 1
tgt_vocab_size = len(dec_tokenizer.index_word) + 1

units = 1024
embedding_dim = 512

encoder = Encoder(src_vocab_size, embedding_dim, units)
decoder = Decoder(tgt_vocab_size, embedding_dim, units)

######
# sample input
sequence_len = 30

sample_enc = tf.random.uniform((batch_size, sequence_len))
sample_output = encoder(sample_enc)

print('Encoder Output: ', sample_output.shape)

sample_state = tf.random.uniform((batch_size, units))

sample_logits, h_dec, attn = decoder(tf.random.uniform((batch_size, 1)), sample_state, sample_output)

print('Decoder output :', sample_logits.shape)
print('Decoder Hidden state :', h_dec.shape)
print('Attention :', attn.shape)

Encoder Output:  (64, 30, 1024)
Decoder output : (64, 8894)
Decoder Hidden state : (64, 1024)
Attention : (64, 30, 1)


## 훈련하기 : Optimizer & loss

In [27]:
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none') # 모델 출력값을 그대로 전달한다.

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real,0))
    loss = loss_object(real, pred)

    mask = tf.cast(mask, dtype=loss.dtype) # <PAD>
    loss *= mask

    return tf.reduce_mean(loss)

    #[ 0.1 0.2 0.7] ----> 2

## 훈련하기 : train_step 구하기

train step 학습과정
1. Encoder에 소스 문장을 전달해 컨텍스트 벡터인 enc_out을 생성
2. Decoder에 입력으로 전달할 토큰 문장 생성
3. t=0일 때, Decoder의 Hidden state는 Encoder의 Final state로 정의. h_dec = enc_out[:, -1]
4. 문장과 enc_out, Hidden state를 기반으로 다음단어 (t=1)예측 pred
5. 예측된 단어와 정답간의 loss을 구한 후, t=1의 정답 단어를 다음 입력으로 사용 (예측단어X)
6. 반복!

In [28]:
@tf.function # 가속 연산
def train_step(src, tgt, encoder, decoder, optimizer, dec_tok):
    bsz = src.shape[0]
    loss = 0

    with tf.GradientTape() as tape: # 학습하면서 발생한 모든 연산을 기록하는 테이프
        enc_out = encoder(src)
        h_dec = enc_out[:, -1]
        
        dec_src = tf.expand_dims([dec_tok.word_index['<start>']] * bsz, 1)

        for t in range(1, tgt.shape[1]):
            pred, h_dec, _ = decoder(dec_src, h_dec, enc_out)

            loss += loss_function(tgt[:, t], pred)
            dec_src = tf.expand_dims(tgt[:, t], 1)
        
    batch_loss = (loss / int(tgt.shape[1]))

    variables = encoder.trainable_variables + decoder.trainable_variables
    gradients = tape.gradient(loss, variables)
    optimizer.apply_gradients(zip(gradients, variables))
    
    return batch_loss

## 훈련하기 : 훈련시작하기

In [29]:
from tqdm import tqdm
import random

epochs = 10

for epoch in range(epochs):
    total_loss = 0

    idx_list = list(range(0, enc_train.shape[0], batch_size))
    random.shuffle(idx_list)
    t = tqdm(idx_list)

    for (batch, idx) in enumerate(t):
        batch_loss = train_step(enc_train[idx:idx+batch_size],
                                dec_train[idx:idx+batch_size],
                                encoder,
                                decoder,
                                optimizer,
                                dec_tokenizer)
        total_loss += batch_loss

        t.set_description_str('Epoch %2d' % (epoch + 1))
        t.set_postfix_str('Loss %.4f' % (total_loss.numpy() / (batch + 1)))

Epoch  1: 100%|██████████| 375/375 [02:17<00:00,  2.72it/s, Loss 1.5000]
Epoch  2: 100%|██████████| 375/375 [01:46<00:00,  3.53it/s, Loss 1.2047]
Epoch  3: 100%|██████████| 375/375 [01:45<00:00,  3.55it/s, Loss 1.0403]
Epoch  4: 100%|██████████| 375/375 [01:45<00:00,  3.55it/s, Loss 0.9297]
Epoch  5: 100%|██████████| 375/375 [01:45<00:00,  3.56it/s, Loss 0.8520]
Epoch  6: 100%|██████████| 375/375 [01:46<00:00,  3.53it/s, Loss 0.7959]
Epoch  7: 100%|██████████| 375/375 [01:45<00:00,  3.56it/s, Loss 0.7480]
Epoch  8: 100%|██████████| 375/375 [01:45<00:00,  3.56it/s, Loss 0.7112]
Epoch  9: 100%|██████████| 375/375 [01:45<00:00,  3.56it/s, Loss 0.6801]
Epoch 10: 100%|██████████| 375/375 [01:45<00:00,  3.57it/s, Loss 0.6608]


In [30]:
@tf.function
def eval_step(src, tgt, encoder, decoder, dec_tok):
  bsz = src.shape[0]
  loss =0

  enc_out = encoder(src)
  h_dec = enc_out[:, -1]

  dec_src = tf.expand_dims([dec_tok.word_index['<start>']]* bsz, 1)

  for t in range(1, tgt.shape[1]):
    pred, h_dec, _ = decoder(dec_src, h_dec, enc_out)
    loss += loss_function(tgt[:, t], pred)
    dec_src = tf.expand_dims(tgt[:, t], 1)

  batch_loss = (loss/int(tgt.shape[1]))

  return batch_loss

In [32]:
# Training Process
from tqdm import tqdm

EPOCHS = 1

for epoch in range(EPOCHS):
  total_loss = 0

  idx_list = list(range(0, enc_train.shape[0], batch_size))
  random.shuffle(idx_list)
  t = tqdm(idx_list)

  for (batch, idx) in enumerate(t):
    batch_loss = train_step(enc_train[idx:idx+batch_size],
                            dec_train[idx:idx+batch_size],
                            encoder,
                            decoder,
                            optimizer,
                            dec_tokenizer)
    total_loss += batch_loss

  t.set_description_str('Epoch %2d' % (epoch +1))
  t.set_postfix_str('Loss %.4f' % (total_loss.numpy()/ (batch+1)))

  test_loss = 0

  idx_list = list(range(0, enc_val.shape[0], batch_size))
  random.shuffle(idx_list)
  t = tqdm(idx_list)

  for (test_batch, idx) in enumerate(t):
    test_batch_loss = eval_step(enc_val[idx:idx+batch_size],
                                dec_val[idx:idx+batch_size],
                                encoder,
                                decoder,
                                dec_tokenizer)
    test_loss += test_batch_loss

  t.set_description_str('Test Epoch %2d' % (epoch+1))
  t.set_postfix_str('Test Loss %.4f' % (test_loss.numpy()/ (test_batch+1)))

100%|██████████| 375/375 [01:44<00:00,  3.60it/s]
100%|██████████| 94/94 [00:24<00:00,  3.88it/s]


In [33]:
def evaluate(sentence, encoder, decoder):
  attention = np.zeros((dec_train.shape[-1], enc_train.shape[-1]))
  sentence = preprocess_sentence(sentence)
  inputs = enc_tokenizer.texts_to_sequences([sentence.split()])
  inputs = tf.keras.preprocessing.sequence.pad_sequences(inputs,
                                                         maxlen = enc_train.shape[-1],
                                                         padding= 'post')
  
  result = ''
  enc_out = encoder(inputs)
  dec_hidden = enc_out[:, -1]
  dec_input = tf.expand_dims([dec_tokenizer.word_index['<start>']], 0)

  for t in range(dec_train.shape[-1]):
    predictions, dec_hidden, attention_weights = decoder(dec_input,
                                                         dec_hidden,
                                                         enc_out)
    
    attention_weights = tf.reshape(attention_weights, (-1, ))
    attention[t] = attention_weights.numpy()

    predicted_id = \
    tf.argmax(tf.math.softmax(predictions, axis=-1)[0]).numpy()

    result += dec_tokenizer.index_word[predicted_id] + ' '

    if dec_tokenizer.index_word[predicted_id] == '<end>':
      return result, sentence, attention

    dec_input = tf.expand_dims([predicted_id], 0)

  return result, sentence, attention

In [34]:
def plot_attention(attention, sentence, predicted_sentence):
  fig = plt.figure(figsize=(10, 10))
  ax = fig.add_subplot(1, 1, 1)
  ax.matshow(attention, cmap='viridis')

  fontdict = {'fontsize': 14}

  ax.set_xticklabels([''] + sentence, fontdict=fontdict, rotation=90)
  ax.set_yticklabels([''] + predicted_sentence, fontdict = fontdict)

  ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
  ax.yaxis.set_major_locator(ticker.MultipleLocator(1))

  plt.show()

In [35]:
def translate(sentence, encoder, decoder):
  result, sentence, attention = evaluate(sentence, encoder, decoder)

  print('Input : %s' % (sentence))
  print('Predicted translation : {}'.format(result))

  attention = attention[:len(result.split()), :len(sentence.split())]
  plot_attention(attention, sentence.split(), result.split(' '))

In [2]:
translate("Can I have some coffee?", encoder, decoder)

NameError: ignored

# Proj. 한-영 번역기 만들기

한-영 번역기 만들기
1. 데이터 다운로드
- 데이터 : https://github.com/jungyeul/korean-parallel-corpora/tree/master/korean-english-news-v1
- korean-english-park.train.tar.gz
2. 데이터 정제
- set 데이터형이 중복이 허용하지 않다는 것을 활용해 중복된 데이터를 제거
  - 데이터 병렬 쌍이 흐트러지지 않게 주의!
  - cleaned_corpus에 저장
- 앞서 정의한 preprocessing()함수는 한글에 대해 동작하지 않아요.
  - 한글에 적용할 수 있는 정규식을 추가해여 함수를 재정의 하세요.
- 타겟 언어인 영문엔 <\start>토큰과 <\end>토큰을 추가하고 split()함수로 토큰화 합니다. 한글 토큰화는 konlpy의 mecab클래스를 사용합니다.
  - cleaned_corpus로부터 토큰의길이가 40이하인 데이터를 선별하여 eng_corpus와 kor_corpus를 각각 구축하기

3. 토큰화
- tokenize()함수를 사용해 데이터를 텐서로 변환하고 각각의 tokenizer를 얻으세요!
  - 단어수는 실험을 통해 적당한 값을 맞춰줍시다(최소 10000이상!)
4. 훈련하기

In [1]:
import matplotlib as mpl
import matplotlib.pyplot as plt
 
%config InlineBackend.figure_format = 'retina'
 
import matplotlib.font_manager as fm
fontpath = '/usr/share/fonts/truetype/nanum/NanumBarunGothic.ttf'
font = fm.FontProperties(fname=fontpath, size=9)
plt.rc('font', family='NanumBarunGothic') 
mpl.font_manager._rebuild()