# RNN
## 1. 개요

- 구현 내용: 간단한 번역 모델을 만들어보며 시계열 데이터를 예측하는 순환신경망(Recurrent Neural Network) 코드를 구현합니다.
- 코드 요약: 데이터를 로드한 뒤 전처리, 단어 임베딩, 인코더, 디코더를 구현합니다.
- 참고 자료: 학교 수업 참고자료 (서울여자대학교)
- 데이터 셋: [Tab-delimited Bilingual Sentence Pairs](https://www.manythings.org/anki/) 중 German - English 를 사용합니다. 코드 내에 데이터를 불러오는 과정을 포함합니다. [User-Agent는 여기](https://www.whatismybrowser.com/detect/what-is-my-user-agent/)를 참고하세요.
- **주의 사항**: Tensorflow는 [tf.keras.preprocessing.text.Tokenizer](https://www.tensorflow.org/api_docs/python/tf/keras/preprocessing/text/Tokenizer)가 지원 종료되기에 tf.keras.layers.TextVectorization 사용을 권장하고 있습니다.

## 2. 코드

In [1]:
# 필요한 라이브러리 import
import os, re, shutil, zipfile
import numpy as np
import pandas as pd
import tensorflow as tf
import unicodedata
import urllib3
from tensorflow.keras.layers import Embedding, GRU, Dense, Input, LSTM, Masking
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.models import Model
import requests

In [2]:
# 개요에 있는 User Agent 웹 사이트 참고하여 자신의 User-Agent 그대로 복사 붙여넣기
headers = {
    'User-Agent' : 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36'
}

In [3]:
# zipfile을 활용한 데이터 다운로드 함수
def download_zip(url, output_path):
  response = requests.get(url, headers=headers, stream=True)
  if response.status_code == 200:
    with open(output_path, 'wb') as f:
      for chunk in response.iter_content(chunk_size=8192):
        f.write(chunk)
    print(f"ZIP file downloaded to {output_path}")
  else:
    print(f"Failed to download. HTTP Response Code: {response.status_code}")

In [4]:
# German - English 데이터 다운로드
url = "http://www.manythings.org/anki/deu-eng.zip"
output_path = "deu-eng.zip"
download_zip(url, output_path)

ZIP file downloaded to deu-eng.zip


In [5]:
# 경로 조정 후 압축 해제
path = os.getcwd()
zipfilename = os.path.join(path, output_path)

with zipfile.ZipFile(zipfilename, 'r') as zip_ref:
  zip_ref.extractall(path)

In [6]:
# 전체 데이터 중 40000개 사용
num_samples = 40000

# 데이터 전처리 함수. (정규화 및 정제)
def preprocess_sentence(sent):
  # Nonspacing Mark (발음 기호 등) 제거
  edited_sent = ''.join(c for c in unicodedata.normalize('NFD', sent) if unicodedata.category(c) != 'Mn')
  # 단어와 구두점 사이에 공백 추가
  edited_sent = re.sub(r"([?.!,¿])", r" \1", edited_sent)
  # 영문자, 문장 부호 외 전부 공백 변환
  edited_sent = re.sub(r"[^a-zA-Z!.?]+", r" ", edited_sent)
  # 공백이 여러 개일 때 하나로 축소
  edited_sent = re.sub(r"\s+", " ", edited_sent)
  return edited_sent

In [7]:
# 전처리 테스트
en_sent = u"Please select. Are the statements right or wrong?"
de_sent = u"Wählen Sie. Sind die Aussagen Richtig oder Falsch?"

print('전처리 전 영어 문장 :', en_sent)
print('전처리 후 영어 문장 :',preprocess_sentence(en_sent))
print('전처리 전 독일어 문장 :', de_sent)
print('전처리 후 독일어 문장 :', preprocess_sentence(de_sent))

전처리 전 영어 문장 : Please select. Are the statements right or wrong?
전처리 후 영어 문장 : Please select . Are the statements right or wrong ?
전처리 전 독일어 문장 : Wählen Sie. Sind die Aussagen Richtig oder Falsch?
전처리 후 독일어 문장 : Wahlen Sie . Sind die Aussagen Richtig oder Falsch ?


In [8]:
# 데이터를 불러오고 전처리 하는 함수
def load_preprocessed_data(file_name):
  encoder_input, decoder_input, decoder_target = [], [], []

  with open(file_name, "r") as lines:
    for i, line in enumerate(lines):
      # source 데이터와 target 데이터 분리
      # 분리된 문장 중 마지막은 라이선스 표기로 학습에 사용하지 않습니다.
      src_line, tar_line, _ = line.strip().split('\t')

      # source 데이터 전처리
      src_line = [w for w in preprocess_sentence(src_line).split()]

      # target 데이터 전처리, sos, eos 표기 추가
      tar_line = preprocess_sentence(tar_line)
      tar_line_in = [w for w in ("<sos> " + tar_line).split()]
      tar_line_out = [w for w in (tar_line + " <eos>").split()]

      encoder_input.append(src_line)
      decoder_input.append(tar_line_in)
      decoder_target.append(tar_line_out)

      if i == num_samples - 1:
        break

  return encoder_input, decoder_input, decoder_target

In [9]:
# 전처리된 데이터 확인
sents_en_in, sents_deu_in, sents_deu_out = load_preprocessed_data("deu.txt")
print('인코더의 입력 :',sents_en_in[:3])
print('디코더의 입력 :',sents_deu_in[:3])
print('디코더의 레이블 :',sents_deu_out[:3])

인코더의 입력 : [['Go', '.'], ['Hi', '.'], ['Hi', '.']]
디코더의 입력 : [['<sos>', 'Geh', '.'], ['<sos>', 'Hallo', '!'], ['<sos>', 'Gru', 'Gott', '!']]
디코더의 레이블 : [['Geh', '.', '<eos>'], ['Hallo', '!', '<eos>'], ['Gru', 'Gott', '!', '<eos>']]


In [10]:
# 영어 토크나이저 설정 (필터링 안함, 소문자 변환 안함)
tokenizer_en = Tokenizer(filters="", lower=False)
# 텍스트 목록을 기반으로 어휘 업데이트
tokenizer_en.fit_on_texts(sents_en_in)
# 텍스트 정수 시퀀스 변환
encoder_input = tokenizer_en.texts_to_sequences(sents_en_in)
# 자리가 남으면 뒤쪽에 padding을 추가 (기본값 0)
encoder_input = pad_sequences(encoder_input, padding="post")

# 독일어 토크나이저 설정 (같은 과정)
tokenizer_deu = Tokenizer(filters="", lower=False)
tokenizer_deu.fit_on_texts(sents_deu_in)
tokenizer_deu.fit_on_texts(sents_deu_out)

decoder_input = tokenizer_deu.texts_to_sequences(sents_deu_in)
decoder_input = pad_sequences(decoder_input, padding="post")

decoder_target = tokenizer_deu.texts_to_sequences(sents_deu_out)
decoder_target = pad_sequences(decoder_target, padding="post")

In [11]:
print('인코더의 입력의 크기(shape) :',encoder_input.shape)
print('디코더의 입력의 크기(shape) :',decoder_input.shape)
print('디코더의 레이블의 크기(shape) :',decoder_target.shape)

src_vocab_size = len(tokenizer_en.word_index) + 1
tar_vocab_size = len(tokenizer_deu.word_index) + 1
print("영어 단어 집합의 크기 : {:d}, 독일어 단어 집합의 크기 : {:d}".format(src_vocab_size, tar_vocab_size))

인코더의 입력의 크기(shape) : (40000, 8)
디코더의 입력의 크기(shape) : (40000, 12)
디코더의 레이블의 크기(shape) : (40000, 12)
영어 단어 집합의 크기 : 5761, 독일어 단어 집합의 크기 : 9518


In [12]:
# 단어로부터 정수를 얻는 딕셔너리, 정수로부터 단어를 얻는 딕셔너리
src_to_index = tokenizer_en.word_index
index_to_src = tokenizer_en.index_word
tar_to_index = tokenizer_deu.word_index
index_to_tar = tokenizer_deu.index_word

# 무작위로 순서 변경
indices = np.arange(encoder_input.shape[0])
np.random.shuffle(indices)
print('무작위 시퀀스 :',indices)

encoder_input = encoder_input[indices]
decoder_input = decoder_input[indices]
decoder_target = decoder_target[indices]

print('encoder input 샘플 출력: ', encoder_input[30997])
print('decoder input 샘플 출력: ',decoder_input[30997])
print('decoder target 샘플 출력: ',decoder_target[30997])

무작위 시퀀스 : [18391 27205 15572 ...  8877 26136 29447]
encoder input 샘플 출력:  [128 894  31   0   0   0   0   0]
decoder input 샘플 출력:  [   2  616   29   20 1035    9    0    0    0    0    0    0]
decoder target 샘플 출력:  [ 616   29   20 1035    9    3    0    0    0    0    0    0]


In [13]:
# 훈련 데이터, 테스트 데이터 분리 (훈련 80/ 테스트 20)
n_of_val = int(num_samples*0.2)
encoder_input_train = encoder_input[:-n_of_val]
decoder_input_train = decoder_input[:-n_of_val]
decoder_target_train = decoder_target[:-n_of_val]

encoder_input_test = encoder_input[-n_of_val:]
decoder_input_test = decoder_input[-n_of_val:]
decoder_target_test = decoder_target[-n_of_val:]

print('훈련 source 데이터의 크기 :',encoder_input_train.shape)
print('훈련 target 데이터의 크기 :',decoder_input_train.shape)
print('훈련 target 레이블의 크기 :',decoder_target_train.shape)
print('테스트 source 데이터의 크기 :',encoder_input_test.shape)
print('테스트 target 데이터의 크기 :',decoder_input_test.shape)
print('테스트 target 레이블의 크기 :',decoder_target_test.shape)

훈련 source 데이터의 크기 : (32000, 8)
훈련 target 데이터의 크기 : (32000, 12)
훈련 target 레이블의 크기 : (32000, 12)
테스트 source 데이터의 크기 : (8000, 8)
테스트 target 데이터의 크기 : (8000, 12)
테스트 target 레이블의 크기 : (8000, 12)


In [14]:
# 임베딩 차원: 하나의 토큰을 몇 개의 벡터로서 표현할 것인가
embedding_dim = 64
hidden_units = 64

In [15]:
# 인코더
encoder_inputs = Input(shape=(None,))
enc_emb = Embedding(src_vocab_size, embedding_dim)(encoder_inputs)
# 연산에서 0(패딩 값) 제외
enc_masking = Masking(mask_value=0.0)(enc_emb)
# 가변 길이 input과 output, 내부 상태 반환을 위해 return_state = True
encoder_lstm = LSTM(hidden_units, return_state=True)
# 은닉 상태와 셀 상태를 저장
encoder_outputs, state_h, state_c = encoder_lstm(enc_masking)
encoder_states = [state_h, state_c]

In [16]:
# 디코더
decoder_inputs = Input(shape=(None,))
# 재사용하기 위해 레이어 저장
dec_emb_layer = Embedding(tar_vocab_size, hidden_units)
dec_emb = dec_emb_layer(decoder_inputs)
dec_masking = Masking(mask_value=0.0)(dec_emb)
# 모든 시점에 대해서 단어를 예측하기 위해 return_sequences는 True
decoder_lstm = LSTM(hidden_units, return_sequences=True, return_state=True)
# 인코더의 은닉 상태를 초기 은닉 상태로 사용
decoder_outputs, _, _ = decoder_lstm(dec_masking, initial_state=encoder_states)
# 모든 시점의 결과에 대해서 소프트맥스 함수를 사용한 출력층을 통해 단어 예측
decoder_dense = Dense(tar_vocab_size, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)

In [17]:
# 모델 생성
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['acc'])
model.fit(x=[encoder_input_train, decoder_input_train], y=decoder_target_train, \
          validation_data=([encoder_input_test, decoder_input_test], decoder_target_test),
          batch_size=128, epochs=50)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<keras.src.callbacks.History at 0x7c9751177ac0>

In [18]:
# 번역할 문장 --(인코더)--> 마지막 시점 은닉 상태, 셀 상태, <sos> --(디코더)--> 예측값 (eos까지 반복하여 생성)

# 학습 후 인코더
encoder_model = Model(encoder_inputs, encoder_states)

# 디코더에 이전 시점의 상태를 보관
decoder_state_input_h = Input(shape=(hidden_units,))
decoder_state_input_c = Input(shape=(hidden_units,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]

# 훈련 때 사용했던 임베딩 층 재사용
dec_emb2 = dec_emb_layer(decoder_inputs)

# 이전 시점의 상태를 현 시점의 초기 상태로 사용하여 다음 단어 예측
decoder_outputs2, state_h2, state_c2 = decoder_lstm(dec_emb2, initial_state=decoder_states_inputs)
decoder_states2 = [state_h2, state_c2]

In [19]:
# 모든 시점에 대해서 단어 예측
decoder_outputs2 = decoder_dense(decoder_outputs2)

In [20]:
# 학습 후 디코더
decoder_model = Model(
    [decoder_inputs] + decoder_states_inputs,
    [decoder_outputs2] + decoder_states2)

# 디코딩 함수
def decode_sequence(input_seq):
  # 입력으로부터 인코더의 마지막 시점의 상태(은닉 상태, 셀 상태)를 얻음
  states_value = encoder_model.predict(input_seq)

  # <sos>에 해당하는 정수 생성
  target_seq = np.zeros((1,1))
  target_seq[0, 0] = tar_to_index['<sos>']

  stop_condition = False
  decoded_sentence = ''

  # stop_condition이 True가 될 때까지 루프 반복
  # 구현의 간소화를 위해서 이 함수는 배치 크기를 1로 가정
  while not stop_condition:
    # 이전 시점의 상태 states_value를 현 시점의 초기 상태로 사용
    output_tokens, h, c = decoder_model.predict([target_seq] + states_value)

    # 예측 결과를 단어로 변환
    sampled_token_index = np.argmax(output_tokens[0, -1, :])
    sampled_char = index_to_tar[sampled_token_index]

    # 현재 시점의 예측 단어를 예측 문장에 추가
    decoded_sentence += ' '+sampled_char

    # <eos>에 도달하거나 정해진 길이를 넘으면 중단
    if (sampled_char == '<eos>' or
        len(decoded_sentence) > 50):
        stop_condition = True

    # 현재 시점의 예측 결과를 다음 시점의 입력으로 사용하기 위해 저장
    target_seq = np.zeros((1,1))
    target_seq[0, 0] = sampled_token_index

    # 현재 시점의 상태를 다음 시점의 상태로 사용하기 위해 저장
    states_value = [h, c]

  return decoded_sentence

In [21]:
# 원래 문장의 정수 시퀀스를 텍스트 시퀀스로 변환하는 함수
def seq_to_src(input_seq):
  sentence = ''
  for encoded_word in input_seq:
    if(encoded_word != 0):
      sentence = sentence + index_to_src[encoded_word] + ' '
  return sentence

# 번역한 문장의 정수 시퀀스를 텍스트 시퀀스로 변환하는 함수
def seq_to_tar(input_seq):
  sentence = ''
  for encoded_word in input_seq:
    if(encoded_word != 0 and encoded_word != tar_to_index['<sos>'] and encoded_word != tar_to_index['<eos>']):
      sentence = sentence + index_to_tar[encoded_word] + ' '
  return sentence

In [22]:
# 번역 테스트
for seq_index in [3, 1020, 3204, 6025]:
  input_seq = encoder_input_train[seq_index: seq_index + 1]
  decoded_sentence = decode_sequence(input_seq)

  print("입력문장 :",seq_to_src(encoder_input_train[seq_index]))
  print("정답문장 :",seq_to_tar(decoder_input_train[seq_index]))
  print("번역문장 :",decoded_sentence[1:-5])
  print("-"*50)

입력문장 : Get a move on ! 
정답문장 : Gib mal Gas ! 
번역문장 : Hol mal eine Minute . 
--------------------------------------------------
입력문장 : Tom was wet . 
정답문장 : Tom war nass . 
번역문장 : Tom war schuchtern . 
--------------------------------------------------
입력문장 : I can t see anyone . 
정답문장 : Ich kann niemanden sehen . 
번역문장 : Ich kann nichts artig . 
--------------------------------------------------
입력문장 : I m ruined . 
정답문장 : Ich bin ruiniert . 
번역문장 : Ich bin ruiniert . 
--------------------------------------------------


In [23]:
# 번역 테스트
for seq_index in [3, 50, 1020, 3204, 6025]:
  input_seq = encoder_input_test[seq_index: seq_index + 1]
  decoded_sentence = decode_sequence(input_seq)

  print("입력문장 :",seq_to_src(encoder_input_test[seq_index]))
  print("정답문장 :",seq_to_tar(decoder_input_test[seq_index]))
  print("번역문장 :",decoded_sentence[1:-5])
  print("-"*50)

입력문장 : I ve got an idea . 
정답문장 : Ich habe eine Idee . 
번역문장 : Ich habe eine gute Arbeit . 
--------------------------------------------------
입력문장 : Close the door . 
정답문장 : Mach die Ture zu . 
번역문장 : Schlie die Tur . 
--------------------------------------------------
입력문장 : I get your drift . 
정답문장 : Ich verstehe was du sagen willst . 
번역문장 : Ich verstehe was ihr sagen . 
--------------------------------------------------
입력문장 : I won t stop you . 
정답문장 : Ich werde dir nichts in den Weg legen . 
번역문장 : Ich werde dich nicht anlugen . 
--------------------------------------------------
입력문장 : I am short of money . 
정답문장 : Mir geht das Geld aus . 
번역문장 : Ich bin knapp bei Kasse . 
--------------------------------------------------
