<a href="https://colab.research.google.com/github/chriss006/2024_Advanced_DeepLearning/blob/main/LSTM_based_Seq2Seq.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **LSTM-based-Seq2Seq for FRA-ENG Translation**

In [2]:
import os
import re
import shutil
import zipfile

import numpy as np
import pandas as pd
import tensorflow as tf
import unicodedata
import urllib3
import requests
from tensorflow.keras.layers import Embedding, GRU, Dense
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.text import Tokenizer
from sklearn.model_selection import train_test_split

In [3]:
headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}

def download_zip(url, output_path):
  response = requests.get(url, headers=headers, stream=True)
  if response.status_code == 200:
    with open(output_path, 'wb') as file:
      for chunk in response.iter_content(chunk_size=1024):
        file.write(chunk)
      print(f'Zip file download to {output_path}')
  else:
    print(f"Failed to download the file. Status code: {response.status_code}")

url = 'http://www.manythings.org/anki/fra-eng.zip'
output_path = 'fra-eng.zip'
download_zip(url, output_path)

path = os.getcwd()
zipfilename = os.path.join(path, output_path)

with zipfile.ZipFile(zipfilename, 'r') as zip_ref:
  zip_ref.extractall(path)

Zip file download to fra-eng.zip


#### **Data Preprocessing**

In [7]:
num_samples = 33000

def to_ascii(s):
  # 프랑스어 악센트(accent) 삭제
  # 예시 : 'déjà diné' -> deja dine
  return ''.join(c for c in unicodedata.normalize('NFD', s)
                   if unicodedata.category(c) != 'Mn')

def preprocess_sentence(sent):
  # 악센트 제거 함수 호출
  sent = to_ascii(sent.lower())

  # 단어와 구두점 사이에 공백 추가.
  # ex) "I am a student." => "I am a student ."
  sent = re.sub(r"([?.!,¿])", r" \1", sent)

  # (a-z, A-Z, ".", "?", "!", ",") 이들을 제외하고는 전부 공백으로 변환.
  sent = re.sub(r"[^a-zA-Z!.?]+", r" ", sent)

  # 다수 개의 공백을 하나의 공백으로 치환
  sent = re.sub(r"\s+", " ", sent)
  return sent

en_sent = u'Have you had dinner?'
fr_sent = u'Avez-vous déjà diné?'

print('전처리 전 영어 문장: ', en_sent)
print('전처리 후 영어 문장: ', preprocess_sentence(en_sent))
print('전처리 전 프랑스어 문장: ', fr_sent)
print('전처리 후 프랑스어 문장: ', preprocess_sentence(fr_sent))

전처리 전 영어 문장:  Have you had dinner?
전처리 후 영어 문장:  have you had dinner ?
전처리 전 프랑스어 문장:  Avez-vous déjà diné?
전처리 후 프랑스어 문장:  avez vous deja dine ?


In [8]:
def load_preprocessed_data():
  encoder_input, decoder_input, decoder_target= [], [], []

  with open('fra.txt','r') as lines:
    for i, line in enumerate(lines):
      src_line, tar_line, _ = line.strip().split('\t')

      src_line_input = [w for w in preprocess_sentence(src_line).split()]

      tar_line = preprocess_sentence(tar_line)
      tar_line_in = [w for w in ("<sos> " + tar_line).split()]
      tar_line_out = [w for w in (tar_line + " <eos>").split()]

      encoder_input.append(src_line_input)
      decoder_input.append(tar_line_in)
      decoder_target.append(tar_line_out)

      if i == num_samples -1:
        break
  return encoder_input, decoder_input, decoder_target

In [9]:
sents_en_in, sents_fra_in, sents_fra_out = load_preprocessed_data()
print('인코더의 입력:', sents_en_in[:5])
print('디코더의 입력:', sents_fra_in[:5])
print('디코더의 레이블:', sents_fra_out[:5])

인코더의 입력: [['go', '.'], ['go', '.'], ['go', '.'], ['go', '.'], ['hi', '.']]
디코더의 입력: [['<sos>', 'va', '!'], ['<sos>', 'marche', '.'], ['<sos>', 'en', 'route', '!'], ['<sos>', 'bouge', '!'], ['<sos>', 'salut', '!']]
디코더의 레이블: [['va', '!', '<eos>'], ['marche', '.', '<eos>'], ['en', 'route', '!', '<eos>'], ['bouge', '!', '<eos>'], ['salut', '!', '<eos>']]


In [11]:
tokenizer_en = Tokenizer(filters="", lower=False)
tokenizer_en.fit_on_texts(sents_en_in)
encoder_input = tokenizer_en.texts_to_sequences(sents_en_in)
encoder_input = pad_sequences(encoder_input, padding="post")

tokenizer_fra = Tokenizer(filters="", lower=False)
tokenizer_fra.fit_on_texts(sents_fra_in)
tokenizer_fra.fit_on_texts(sents_fra_out)

decdoer_input = tokenizer_fra.texts_to_sequences(sents_fra_in)
decoder_input = pad_sequences(decdoer_input, padding="post")

decoder_target = tokenizer_fra.texts_to_sequences(sents_fra_out)
decoder_target = pad_sequences(decoder_target, padding="post")

print('인코더의 입력 크기(shape) :',encoder_input.shape)
print('디코더의 입력 크기(shape) :',decoder_input.shape)
print('디코더 레이블 크기(shape) :',decoder_target.shape)

인코더의 입력 크기(shape) : (33000, 7)
디코더의 입력 크기(shape) : (33000, 16)
디코더 레이블 크기(shape) : (33000, 16)


In [12]:
src_vocab_size = len(tokenizer_en.word_index) + 1
tar_vocab_size = len(tokenizer_fra.word_index) + 1
print("영어 단어 집합의 크기 : {:d}, 프랑스어 단어 집합의 크기 : {:d}".format(src_vocab_size, tar_vocab_size))

영어 단어 집합의 크기 : 4485, 프랑스어 단어 집합의 크기 : 7878


In [13]:
src_to_index = tokenizer_en.word_index
index_to_src = tokenizer_en.index_word

tar_to_index = tokenizer_fra.word_index
index_to_tar = tokenizer_fra.index_word

indices = np.arange(encoder_input.shape[0])
np.random.shuffle(indices)
print('랜덤 시퀀스:',indices)

랜덤 시퀀스: [32090  2488 23718 ... 23625 18951  1870]


In [15]:
encoder_input = encoder_input[indices]
decoder_input = decoder_input[indices]
decoder_target = decoder_target[indices]

print('인코더의 input sample:', encoder_input[30997])
print('디코더의 input sample:', decoder_input[30997])
print('디코더의 target sample:', decoder_target[30997])

인코더의 input sample: [  2  20 942   1   0   0   0]
디코더의 input sample: [   2   13    5 1542  321    1    0    0    0    0    0    0    0    0
    0    0]
디코더의 target sample: [  13    5 1542  321    1    3    0    0    0    0    0    0    0    0
    0    0]


#### **Train, Validaiton Data Preparation**

In [16]:
n_of_val = int(33000*0.1)
encoder_input_train = encoder_input[:-n_of_val]
decoder_input_train = decoder_input[:-n_of_val]
decoder_target_train = decoder_target[:-n_of_val]

encoder_input_test = encoder_input[-n_of_val:]
decoder_input_test = decoder_input[-n_of_val:]
decoder_target_test = decoder_target[-n_of_val:]

print('Train source data size:', encoder_input_train.shape)
print('Train target data size:', decoder_input_train.shape)
print('Train target label size:', decoder_target_train.shape)

print('Test source data size:', encoder_input_test.shape)
print('Test target data size:', decoder_input_test.shape)
print('Test target label size:', decoder_target_test.shape)

Train source data size: (29700, 7)
Train target data size: (29700, 16)
Train target label size: (29700, 16)
Test source data size: (3300, 7)
Test target data size: (3300, 16)
Test target label size: (3300, 16)


#### **Model Building & Training**

In [17]:
from tensorflow.keras.layers import Input, LSTM, Embedding, Dense, Masking
from tensorflow.keras.models import Model

embedding_dim = 64
hidden_units = 64

#Encoder
encoder_inputs = Input(shape=(None,))
enc_emb = Embedding(src_vocab_size, embedding_dim)(encoder_inputs)
enc_masking = Masking(mask_value=0.0)(enc_emb)
encoder_lstm = LSTM(hidden_units, return_state=True)
encoder_outputs, state_h, state_c = encoder_lstm(enc_masking)
encoder_states = [state_h, state_c]

#Decoder
decoder_inputs = Input(shape=(None,))
dec_emb_layer = Embedding(tar_vocab_size, embedding_dim)
dec_emb = dec_emb_layer(decoder_inputs)

decoder_lstm = LSTM(hidden_units, return_sequences=True, return_state=True)
decoder_outputs, _, _ = decoder_lstm(dec_emb, initial_state=encoder_states)

decoder_dense = Dense(tar_vocab_size, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)

model = Model([encoder_inputs, decoder_inputs], decoder_outputs)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['acc'])

In [18]:
model.fit(x=[encoder_input_train, decoder_input_train], y=decoder_target_train, \
          validation_data=([encoder_input_test, decoder_input_test], decoder_target_test), batch_size=128, epochs=50)

Epoch 1/50
[1m233/233[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 34ms/step - acc: 0.6054 - loss: 5.0104 - val_acc: 0.6234 - val_loss: 1.9459
Epoch 2/50
[1m233/233[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 30ms/step - acc: 0.6694 - loss: 1.8563 - val_acc: 0.7465 - val_loss: 1.6713
Epoch 3/50
[1m233/233[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 31ms/step - acc: 0.7478 - loss: 1.6313 - val_acc: 0.7571 - val_loss: 1.5608
Epoch 4/50
[1m233/233[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 30ms/step - acc: 0.7556 - loss: 1.5348 - val_acc: 0.7570 - val_loss: 1.5081
Epoch 5/50
[1m233/233[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 31ms/step - acc: 0.7597 - loss: 1.4780 - val_acc: 0.7631 - val_loss: 1.4661
Epoch 6/50
[1m233/233[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 30ms/step - acc: 0.7640 - loss: 1.4301 - val_acc: 0.7685 - val_loss: 1.4276
Epoch 7/50
[1m233/233[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 32m

<keras.src.callbacks.history.History at 0x7ac6047037f0>

#### **Revised Decoder with Teacher Forcing**

In [19]:
# 인코더
encoder_model = Model(encoder_inputs, encoder_states)

# 디코더 설계 시작
# 이전 시점의 상태를 보관할 텐서
decoder_state_input_h = Input(shape=(hidden_units,))
decoder_state_input_c = Input(shape=(hidden_units,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]

# 훈련 때 사용했던 임베딩 층을 재사용
dec_emb2 = dec_emb_layer(decoder_inputs)

# 다음 단어 예측을 위해 이전 시점의 상태를 현 시점의 초기 상태로 사용
decoder_outputs2, state_h2, state_c2 = decoder_lstm(dec_emb2, initial_state=decoder_states_inputs)
decoder_states2 = [state_h2, state_c2]

# 모든 시점에 대해서 단어 예측
decoder_outputs2 = decoder_dense(decoder_outputs2)

# 수정된 디코더
decoder_model = Model(
    [decoder_inputs] + decoder_states_inputs,
    [decoder_outputs2] + decoder_states2)

#### **Model Inference**

In [20]:
def decode_sequence(input_seq):
  # 입력으로부터 인코더의 마지막 시점의 상태(은닉 상태, 셀 상태)를 얻음
  states_value = encoder_model.predict(input_seq)

  # <SOS>에 해당하는 정수 생성
  target_seq = np.zeros((1,1))
  target_seq[0, 0] = tar_to_index['<sos>']

  stop_condition = False
  decoded_sentence = ''

  # stop_condition이 True가 될 때까지 루프 반복
  # 구현의 간소화를 위해서 이 함수는 배치 크기를 1로 가정합니다.
  while not stop_condition:
    # 이점 시점의 상태 states_value를 현 시점의 초기 상태로 사용
    output_tokens, h, c = decoder_model.predict([target_seq] + states_value)

    # 예측 결과를 단어로 변환
    sampled_token_index = np.argmax(output_tokens[0, -1, :])
    sampled_char = index_to_tar[sampled_token_index]

    # 현재 시점의 예측 단어를 예측 문장에 추가
    decoded_sentence += ' '+sampled_char

    # <eos>에 도달하거나 정해진 길이를 넘으면 중단.
    if (sampled_char == '<eos>' or
        len(decoded_sentence) > 50):
        stop_condition = True

    # 현재 시점의 예측 결과를 다음 시점의 입력으로 사용하기 위해 저장
    target_seq = np.zeros((1,1))
    target_seq[0, 0] = sampled_token_index

    # 현재 시점의 상태를 다음 시점의 상태로 사용하기 위해 저장
    states_value = [h, c]

  return decoded_sentence


# 원문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq_to_src(input_seq):
  sentence = ''
  for encoded_word in input_seq:
    if(encoded_word != 0):
      sentence = sentence + index_to_src[encoded_word] + ' '
  return sentence

# 번역문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq_to_tar(input_seq):
  sentence = ''
  for encoded_word in input_seq:
    if(encoded_word != 0 and encoded_word != tar_to_index['<sos>'] and encoded_word != tar_to_index['<eos>']):
      sentence = sentence + index_to_tar[encoded_word] + ' '
  return sentence

for seq_index in [3, 50, 100, 300, 1001]:
  input_seq = encoder_input_train[seq_index: seq_index + 1]
  decoded_sentence = decode_sequence(input_seq)

  print("입력문장 :",seq_to_src(encoder_input_train[seq_index]))
  print("정답문장 :",seq_to_tar(decoder_input_train[seq_index]))
  print("번역문장 :",decoded_sentence[1:-5])
  print("-"*50)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 178ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 145ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 31ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 25ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
입력문장 : he blew the deal . 
정답문장 : avancons . 
번역문장 : je suis un homme sur . 
--------------------------------------------------
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 23ms/step
[1m1/