In [1]:
import os
import re
import numpy as np
import pandas as pd
import tensorflow as tf
import unicodedata
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Embedding, GRU, Dense
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.text import Tokenizer

In [2]:
# 파일
path = 'fra.txt'

In [3]:
#유니코드 -> 아스키코드로
def unicode_to_ascii(s):
  return ''.join(c for c in unicodedata.normalize('NFD',s)
    if unicodedata.category(c) != 'Mn')

In [4]:
def preprocess_sentence(w):
  w = unicode_to_ascii(w.lower())

  #단어와 . 사이 공백
  w = re.sub(r"([?.!,¿])", r" \1 ", w)

  #a-z, A-Z, ., ?, !, ,을 제외하고는 모두 공백으로 변환
  w = re.sub(r"[^a-zA-Z?.!]+", r" ", w)

  w = re.sub(r"\s+", " ", w)
  return w

In [5]:
#전처리 테스트
eng = "May I borrow this book?"
fra = "Puis-je emprunter ce livre?"

print('전처리 전 영어 문장 :', eng)
print('전처리 후 영어 문장 :', preprocess_sentence(eng))
print('\n전처리 전 불어 문장 :', fra)
print('전처리 후 불어 문장 :', preprocess_sentence(fra))

전처리 전 영어 문장 : May I borrow this book?
전처리 후 영어 문장 : may i borrow this book ? 

전처리 전 불어 문장 : Puis-je emprunter ce livre?
전처리 후 불어 문장 : puis je emprunter ce livre ? 


In [6]:
#사용할 샘플 수
num_samples = 100000

In [7]:
def load_preprocessed_data():
  encoder_input, decoder_input, decoder_target = [], [], []

  #"fra.txt" 파일을 읽기 모드로 열고 순회
  with open("fra.txt", "r") as lines:
    for i, line in enumerate(lines):
      # '_' 필요하지 않은 정보를 저장하지 않기 위해사용
      source_line, target_line, _ = line.strip().split('\t')

      source_line = [w for w in preprocess_sentence(source_line).split()]

      target_line = preprocess_sentence(target_line)
      target_line_in = [w for w in ("<sos> " + target_line).split()]
      target_line_out = [w for w in (target_line + " <eos>").split()]

      encoder_input.append(source_line)
      decoder_input.append(target_line_in)
      decoder_target.append(target_line_out)

      #num_samples로 지정한 데이터의 개수에 도달하면 루프를 종료
      if i == num_samples - 1:
        break

  return encoder_input,decoder_input, decoder_target

In [8]:
in_w_eng, in_w_fra, out_w_fra = load_preprocessed_data()
print('인코더 입력 :', in_w_eng[:5])
print('디코더 입력 :', in_w_fra[:5])
print('디코더 레이블 :', out_w_fra[:5])

인코더 입력 : [['go', '.'], ['go', '.'], ['go', '.'], ['go', '.'], ['hi', '.']]
디코더 입력 : [['<sos>', 'va', '!'], ['<sos>', 'marche', '.'], ['<sos>', 'en', 'route', '!'], ['<sos>', 'bouge', '!'], ['<sos>', 'salut', '!']]
디코더 레이블 : [['va', '!', '<eos>'], ['marche', '.', '<eos>'], ['en', 'route', '!', '<eos>'], ['bouge', '!', '<eos>'], ['salut', '!', '<eos>']]


In [9]:
#텍스트 데이터를 정수 시퀀스로 변환하고 패딩 적용

#필터="": 특수문자 제거 x / lower=False: 단어를 소문자로 변환 x
#텍스트 데이터 내 단어들이 정수로 매핑되는 인덱스 생성
#각 문장을 정수 시퀀스로 변환
#시퀀스 뒷부분에 패딩 추가
tokenizer_eng = Tokenizer(filters="", lower=False)
tokenizer_eng.fit_on_texts(in_w_eng)
encoder_input = tokenizer_eng.texts_to_sequences(in_w_eng)
encoder_input = pad_sequences(encoder_input, padding="post")

tokenizer_fra = Tokenizer(filters="", lower=False)
tokenizer_fra.fit_on_texts(in_w_fra)
tokenizer_fra.fit_on_texts(out_w_fra)

decoder_input = tokenizer_fra.texts_to_sequences(in_w_fra)
decoder_input = pad_sequences(decoder_input, padding="post")

decoder_target = tokenizer_fra.texts_to_sequences(out_w_fra)
decoder_target = pad_sequences(decoder_target, padding="post")

In [10]:
print('인코더 입력 크기 :',encoder_input.shape)
print('디코더 입력 크기:',decoder_input.shape)
print('디코더 레이블 크기:',decoder_target.shape)

인코더 입력 크기 : (100000, 10)
디코더 입력 크기: (100000, 17)
디코더 레이블 크기: (100000, 17)


In [11]:
src_vocab_size = len(tokenizer_eng.word_index) + 1
tar_vocab_size = len(tokenizer_fra.word_index) + 1
print("영어 단어 집합 크기: {:d}, 불어 단어 집합 크기: {:d}".format(src_vocab_size, tar_vocab_size))

영어 단어 집합 크기: 8654, 불어 단어 집합 크기: 14583


In [12]:
#단어->정수 딕셔너리, 정수->단어 딕셔너리 생성
src_to_index = tokenizer_eng.word_index
index_to_src = tokenizer_eng.index_word
tar_to_index = tokenizer_fra.word_index
index_to_tar = tokenizer_fra.index_word

In [13]:
#모델의 학습이 일정한 순서로 진행되지 않고, 전체적인 특성을 학습하도록 랜덤 셔플 사용
indices = np.arange(encoder_input.shape[0])
np.random.shuffle(indices)
print("랜덤 시퀸스: ", indices)

랜덤 시퀸스:  [28781  9516 98068 ... 62671 74512 34590]


In [14]:
encoder_input = encoder_input[indices]
decoder_input = decoder_input[indices]
decoder_target = decoder_target[indices]

In [15]:
encoder_input[40000]

array([324,  42, 984,   1,   0,   0,   0,   0,   0,   0], dtype=int32)

In [16]:
decoder_input[40000]

array([  2, 216,  10,  85, 149,   1,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0], dtype=int32)

In [17]:
decoder_target[40000]

#decoder_input과 decoder_target에서 동일 시퀀스 확인

array([216,  10,  85, 149,   1,   3,   0,   0,   0,   0,   0,   0,   0,
         0,   0,   0,   0], dtype=int32)

In [18]:
n_val = int(100000*0.2)
print('검증 데이터 개수: ', n_val)

검증 데이터 개수:  20000


In [19]:
#데이터를 훈련데이터와 검증데이터로 분리
encoder_input_train = encoder_input[:-n_val]
decoder_input_train = decoder_input[:-n_val]
decoder_target_train = decoder_target[:-n_val]

encoder_input_test = encoder_input[-n_val:]
decoder_input_test = decoder_input[-n_val:]
decoder_target_test = decoder_target[-n_val:]

In [20]:
print('훈련 소스 데이터 크기 :',encoder_input_train.shape)
print('훈련 타겟 데이터 크기 :',decoder_input_train.shape)
print('훈련 타겟 레이블 크기 :',decoder_target_train.shape)
print('검증 소스 데이터 크기 :',encoder_input_test.shape)
print('검증 타겟 데이터 크기 :',decoder_input_test.shape)
print('검증 타겟 레이블 크기 :',decoder_target_test.shape)

훈련 소스 데이터 크기 : (80000, 10)
훈련 타겟 데이터 크기 : (80000, 17)
훈련 타겟 레이블 크기 : (80000, 17)
검증 소스 데이터 크기 : (20000, 10)
검증 타겟 데이터 크기 : (20000, 17)
검증 타겟 레이블 크기 : (20000, 17)


In [21]:
#모델 설계

In [22]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Embedding, Dense, Masking

In [23]:
#임베딩 벡터 차원, LSTM의 은닉상태 크기 64
embedding_dim = 64
hidden_units = 64

In [24]:
encoder_inputs = Input(shape=(None,))

#단어를 임베딩 벡터로 변환
encoder_emb = Embedding(src_vocab_size, embedding_dim)(encoder_inputs)

#입력 시퀀스에서 값이 0인 부분을 마스킹 -> 패딩 부분 무시
encoder_masking = Masking(mask_value=0.0)(encoder_emb)

# LSTM 층의 마지막 시점의 은닉 상태와 셀 상태를 반환
encoder_lstm = LSTM(hidden_units, return_state=True)

#마스킹된 입력 시퀀스를 LSTM 층에 입력으로 전달
encoder_outputs, state_h,state_c = encoder_lstm(encoder_masking)

#인코더의 은닉상태와 셀상태 저장
encoder_states = [state_h, state_c]

In [25]:
decoder_inputs = Input(shape=(None,))
decoder_emb_layer = Embedding(tar_vocab_size, hidden_units)
decoder_emb = decoder_emb_layer(decoder_inputs)
decoder_masking = Masking(mask_value=0.0)(decoder_emb)
decoder_lstm = LSTM(hidden_units, return_sequences=True, return_state=True)

#인코더의 은닉 상태를 초기 은닉상태로 사용
decoder_outputs, _, _ = decoder_lstm(decoder_masking, initial_state = encoder_states)

#출력층을 통해 단어 예측
decoder_dense = Dense(tar_vocab_size, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)

#Seq2Seq 모델 정의
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

In [None]:
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

In [26]:
history = model.fit(x=[encoder_input_train, decoder_input_train], y=decoder_target_train, validation_data=([encoder_input_test, decoder_input_test], decoder_target_test),
          batch_size=128,
          epochs=50)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<keras.callbacks.History at 0x7ce15afc62c0>

In [35]:
# 학습 데이터에 대한 정확도 계산
train_loss, train_accuracy = model.evaluate(x=[encoder_input_train, decoder_input_train], y=decoder_target_train)
print("학습 데이터 정확도:", train_accuracy)

# 검증 데이터에 대한 정확도 계산
test_loss, test_accuracy = model.evaluate(x=[encoder_input_test, decoder_input_test], y=decoder_target_test)
print("검증 데이터 정확도:", test_accuracy)

학습 데이터 정확도: 0.9331102967262268
검증 데이터 정확도: 0.8661147356033325


In [None]:
y_vloss = history.history['val_loss']
y_loss = history.history['loss']

x_len = np.arange(len(y_vloss))
plt.plot(x_len, y_vloss, marker='.', c="red", label='Testset_loss')
plt.plot(x_len, y_loss, marker='.', c="blue", label='Trainset_loss')

plt.legend(loc='upper right')
plt.grid()
plt.xlabel('epoch')
plt.ylabel('loss')
plt.show()

In [27]:
#seq2seq는 훈련과 테스트 과정에서 동작방식이 다르므로 테스트 과정 위해 모델 다시 설계
# -> 번역 단계를 위해 모델 수정하고 동작

In [28]:
encoder_model = Model(encoder_inputs, encoder_states)

decoder_state_input_h = Input(shape=(hidden_units,))
decoder_state_input_c = Input(shape=(hidden_units,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]

#임베딩층 재사용
decoder_emb2 = decoder_emb_layer(decoder_inputs)

#다음 단어 예측위해 이전 시점의 상태를 현 시점의 초기상태로 사용
decoder_outputs2, state_h2, state_c2 = decoder_lstm(decoder_emb2, initial_state=decoder_states_inputs)
decoder_states2 = [state_h2, state_c2]

decoder_outputs2 = decoder_dense(decoder_outputs2)

#수정된 디코더
decoder_model = Model(
    [decoder_inputs] + decoder_states_inputs,
    [decoder_outputs2] + decoder_states2
)

In [29]:
def decoder_sequence(input_seq):
  #입력으로부터 인코더의 마지막 시점의 은닉상태, 셀상태 얻기
  states_value = encoder_model.predict(input_seq)

  #<SOS>에 해당하는 정수 생성
  target_seq = np.zeros((1, 1))
  target_seq[0, 0] = tar_to_index['<sos>']

  stop_condition = False
  decoded_sentence = ''

  #<eos>도달 or 정해진 길이를 넘으면 중단
  while not stop_condition:
    output_tokens, h, c = decoder_model.predict([target_seq]+states_value)

    #예측 결과를 단어로
    sampled_token_index = np.argmax(output_tokens[0, -1, :])
    sampled_char = index_to_tar[sampled_token_index]
    #예측 단어를 문장에 추가
    decoded_sentence += ' '+sampled_char
    if(sampled_char == '<eos>' or len(decoded_sentence)>50):
      stop_condition = True

    #현재 시점의 예측 결과를 다음 시점의 입력으로 사용하기 위해 저장
    target_seq = np.zeros((1,1))
    target_seq[0, 0] = sampled_token_index
    #현재 시점의 상태를 다음 시점의 상태로 사용하기 위해 저장
    states_value = [h, c]

  return decoded_sentence

In [30]:
#원문 정수 시퀀스->텍스트 시퀀스
#인코더 입력으로 사용된 영어 문장을 원래 영어 문장으로 변환
def seq_to_src(input_seq):
  sentence = ''
  for encoded_word in input_seq:
    if(encoded_word != 0):
      sentence = sentence + index_to_src[encoded_word] + ' '
  return sentence

In [31]:
#번역문 정수 시퀀스->텍스트 시퀀스
#디코더의 출력으로 사용된 불어 문장을 원래 불어 문장으로 변환
def seq_to_tar(input_seq):
  sentence = ''
  for encoded_word in input_seq:
    #encoded_word가 0이 아니고, <sos>와 <eos> 토큰이 아니라면 해당 정수를 프랑스어 단어로 변환
    if(encoded_word != 0 and encoded_word != tar_to_index['<sos>'] and encoded_word != tar_to_index['<eos>']):
      sentence = sentence + index_to_tar[encoded_word] + ' '
  return sentence

In [32]:
#훈련데이터 대한 임의의 인덱스 샘플 결과 출력
for seq_index in [3, 50, 100, 300, 1001]:
  input_seq = encoder_input_train[seq_index: seq_index + 1]
  decoded_sentence = decoder_sequence(input_seq)

  print("입력 문장: ", seq_to_src(encoder_input_train[seq_index]))
  # 학습 데이터에 있는 불어 문장
  print("정답 문장: ", seq_to_tar(decoder_input_train[seq_index]))
  #디코더 모델에 의해 실제로 생성된 불어 문장
  print("번역 문장: ", decoded_sentence[1:-5])

입력 문장:  don t scribble here . 
정답 문장:  ne gribouille pas ici ! 
번역 문장:  ne gribouille pas ici ! 
입력 문장:  tom do you still love me ? 
정답 문장:  tom est ce que tu m aimes toujours ? 
번역 문장:  tom m aimes tu toujours ? 
입력 문장:  we stayed at a nice hotel . 
정답 문장:  nous sejournames dans un chouette hotel . 
번역 문장:  nous sejournames dans un bel hotel . 
입력 문장:  the sound woke me up . 
정답 문장:  le bruit m a reveille . 
번역 문장:  le bruit m a reveille . 
입력 문장:  my grandfather was a miner . 
정답 문장:  mon grand pere etait mineur . 
번역 문장:  mon reste etait grand groupe . 


In [33]:
#검증 데이터에 대한 임의의 인덱스 샘플 결과 출력
for seq_index in [3, 50, 100, 300, 1001]:
  input_seq = encoder_input_test[seq_index: seq_index + 1]
  decoded_sentence = decoder_sequence(input_seq)

  print("입력 문장: ", seq_to_src(encoder_input_test[seq_index]))
  print("정답 문장: ", seq_to_tar(decoder_input_test[seq_index]))
  print("번역 문장: ", decoded_sentence[1:-5])
  print("-"*50)

입력 문장:  we didn t think of that . 
정답 문장:  nous n y avons pas pense . 
번역 문장:  nous n en avons pas qu une decision ! 
--------------------------------------------------
입력 문장:  whose phone is this ? 
정답 문장:  a qui est ce telephone ? 
번역 문장:  a qui est ce telephone ? 
--------------------------------------------------
입력 문장:  when were you born ? 
정답 문장:  quand es tu ne ? 
번역 문장:  quand etes vous nees ? 
--------------------------------------------------
입력 문장:  i didn t take the hint . 
정답 문장:  je n ai pas compris l allusion . 
번역 문장:  je ne me suis pas alle au bras . 
--------------------------------------------------
입력 문장:  you ve misspelled my name . 
정답 문장:  vous avez ecorche mon nom . 
번역 문장:  tu as ecorche mon nom . 
--------------------------------------------------
