In [None]:
# 구글 드라이브와 연결

#from google.colab import auth
#auth.authenticate_user()

from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
# 경로 설정
path = '/content/gdrive/My Drive/pytest/fra-eng/'

!ls '/content/gdrive/My Drive/pytest/fra-eng/'

In [None]:
# 데이터 확인
# 코랩의 메모리 용량 문제가 있어 small 데이터로 진행한다.
import pandas as pd
data = pd.read_csv(path+'fra-eng_small.txt', names=['source', 'target'], sep='\t', encoding='utf-8')
print('data length:', len(data))          # 268
print('data type:', type(data))
print('data shape:', data.shape)
print('data sample:\n', data.sample(5))

In [None]:
print('data.target length:', len(data.target))          # 268
print('data.target type:', type(data.target))           # Series
print('data.target shape:', data.target.shape)
print('data.target sample:\n', data.target.sample(5))

In [None]:
# 시작부호와 종료부호 부착
# 데이터가 모두 3종이 필요하다. source 언어에는 encoder_input 1개, target 언어에는 decoder_input, decoder_target 2개이다
# encoder는 source 언어를 그대로 사용하면 되나, decoder는 seq2seq의 사용을 위해 \t, \n를 부착해야 한다
# decoder_input 데이터의 시작에는 \t, 문장의 끝에는 \n를 부착한다
# decoder_target 데이터는 \n만 필요하다
# tab을 받았기 때문에 자료가 더 오른쪽에서 출력되는 것을 알 수 있다
data.target_input = data.target.apply(lambda x : '\t'+x+'\n')
data.target_target = data.target.apply(lambda x : x+'\n')
print('\nnew data sample:\n', data.sample(5))

In [None]:
print('data.target_input length:', len(data.target_input))          # 268
print('data.target_input type:', type(data.target_input))           # Series
print('data.target_input shape:', data.target_input.shape)          # (268,)
print('data.target_input sample:\n', data.target_input.sample(5))

In [None]:
# 문장의 길이 maxlen 설정하기
# source와 target 문장의 최대 길이를 구한다
source_sentence_max_length = data.source.apply(lambda x: len(x)).max()
print('source sentence max length: ', source_sentence_max_length)               # 9
target_sentence_max_length = data.target_input.apply(lambda x: len(x)).max()-2  # "\t, \n"의 길이는 제외
print('target sentence max length: ', target_sentence_max_length)               # 30

max_src_len = source_sentence_max_length			                                  # source 문장의 최대 음절 길이로 maxlen을 설정한다
max_tar_len = target_sentence_max_length                                        # target 문장의 최대 음절 길이로 maxlen을 설정한다

In [None]:
# Data Tokenizing
# 각 문자 종류에 대하여 숫자값을 배당한다
%tensorflow_version 2.x
from tensorflow.keras.preprocessing.text import Tokenizer

# source 언어 Tokenizing
tokenizer_source = Tokenizer(num_words=None, char_level=True, lower=False)      # Tokenizer 객체 생성
tokenizer_source.fit_on_texts(data.source)     	                                # 인덱스를 구축한다
word_index_source = tokenizer_source.word_index                                 # 글자와 인덱스의 쌍을 가져온다

print('\n전체에서 %s개의 고유한 토큰을 찾았습니다.' % len(word_index_source))   # 91
print('word_index_source: ', word_index_source)

In [None]:
# target 언어 Tokenizing
# target 언어로는 target_input으로 하나만 만들면 된다
tokenizer_target = Tokenizer(num_words=None, char_level=True, lower=False)      # Tokenizer 객체 생성
tokenizer_target.fit_on_texts(data.target_input)     	                          # 인덱스를 구축한다
word_index_target = tokenizer_target.word_index                                 # 글자와 인덱스의 쌍을 가져온다

print('\n전체에서 %s개의 고유한 토큰을 찾았습니다.' % len(word_index_target))   # 116
print('word_index_target: ', word_index_target)

In [None]:
# Data Sequencing
# 배당된 숫자를 이용하여 각 문장의 문자를 숫자로 치환한다
# source 언어 Sequencing
# 문제가 있으면 오른쪽과 같이 시작한다. encoder_input = tokenizer_source.texts_to_sequences(list(data.source)) 
encoder_input = tokenizer_source.texts_to_sequences(data.source) 

print('\nResult of encoder_input sequencing: ')
print(data.source[0], encoder_input[0])
print(data.source[1], encoder_input[1])
print(data.source[2], encoder_input[2])
print(data.source[3], encoder_input[3])

In [None]:
# target 언어 Sequencing
decoder_input = tokenizer_target.texts_to_sequences(data.target_input)
decoder_target = tokenizer_target.texts_to_sequences(data.target_target)

print('\nResult of decoder_input sequencing: ')
print(data.target_input[0], decoder_input[0])
print(data.target_input[1], decoder_input[1])
print(data.target_input[2], decoder_input[2])

print('\nResult of decoder_target sequencing: ')
print(data.target_target[0], decoder_target[0])
print(data.target_target[1], decoder_target[1])
print(data.target_target[2], decoder_target[2])

In [None]:
print('data.source type:', type(data.source))              # Series
print('encoder_input type:', type(encoder_input))          # list

print('data.source:\n', data.source)
print('encoder_input\n:', encoder_input)

In [None]:
# Data Padding
from tensorflow.keras.preprocessing.sequence import pad_sequences
encoder_input = pad_sequences(encoder_input, maxlen=max_src_len, padding='post')
decoder_input = pad_sequences(decoder_input, maxlen=max_tar_len, padding='post')
decoder_target = pad_sequences(decoder_target, maxlen=max_tar_len, padding='post')

print('\nPadding result sample: ')
print(data.target_input[0], decoder_input[0])

In [None]:
print('decoder_input length:', len(decoder_input))          # 268
print('decoder_input type:', type(decoder_input))           # numpy.ndarray
print('decoder_input shape:', decoder_input.shape)          # (268, 30)

In [None]:
# One-Hot-Encoding
# 케라스 원-핫 인코딩을 수행한다.
# 클래스의 수는 1을 올려주어야 한다(Padding으로 생긴 0을 추가로 받아야 함)
from tensorflow.keras.utils import to_categorical
encoder_input = to_categorical(encoder_input, num_classes=len(word_index_source)+1)
decoder_input = to_categorical(decoder_input, num_classes=len(word_index_target)+1)
decoder_target = to_categorical(decoder_target, num_classes=len(word_index_target)+1)

print('\nResult of One-Hot Encodded decoder_input sequencing: ')
print(decoder_input.shape)
print(data.target_input[0], decoder_input[0])

In [None]:
print('0-0\n', decoder_input[0][0])
print('\n0-1\n', decoder_input[0][1])
print('\n0-2\n', decoder_input[0][2])
print('\n0-28\n', decoder_input[0][28])
print('\n0-29\n', decoder_input[0][29])

In [None]:
print('decoder_input length:', len(decoder_input))          # 268
print('decoder_input type:', type(decoder_input))           # numpy.ndarray
print('decoder_input shape:', decoder_input.shape)          # (268, 30, 70)

In [None]:
# 교사 강요를 이용한 모델 훈련
# 예측 과정에서는 이전 시점의 decoder_input의 예측 결과를 현재 시점의 decoder_input으로 넣을 것이다
# 그러나 훈련 과정에서는 그렇게 하면 잘못 예측된 이전 시점의 결과가 현재 시점으로 들어가게 된다
# 따라서 훈련 과정에서는 이전 시점의 decoder_input의 실제값을 현재 시점의 decoder_input으로 넣을 것이다
# 이를 교사 강요라고 한다
# 현재 Input이 3D이므로 Embedding으로 차원을 늘려주지 않아도 바로 3D input shape을 갖는 순환신경망을 이용할 수 있다

# Context Vector 만들기
# 훈련용 Encoder
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense

encoder_inputs = Input(shape=(None, len(word_index_source)+1))      # 입력문의 길이가 문장마다 다르므로 None. 출력되는 내용은 최대 len(word_index_source)+1
encoder_lstm = LSTM(units=256, return_state=True)                   # encoder 내부 상태를 decoder로 넘겨주기 위해 return_state=True
encoder_outputs, state_h, state_c = encoder_lstm(encoder_inputs)    # return_state=True로 만들어진 모델이므로 은닉상태와 셀상태를 받는다. encoder_outputs는 사용하지 않는다
encoder_states = [state_h, state_c]                                 # 은닉상태와 셀상태를 받는다

In [None]:
# 훈련용 Decoder
decoder_inputs = Input(shape=(None, len(word_index_target)+1))
decoder_lstm = LSTM(units=256, return_sequences=True, return_state=True)            # 256 unit으로 된 encoder_states를 받아야 하므로 decoder의 은닉 상태도 256으로 동일하게 맞춰준다
decoder_outputs, _, _ = decoder_lstm(decoder_inputs, initial_state=encoder_states)  # decoder의 첫 상태를 encoder의 은닉 상태와 셀 상태로 한다
decoder_dense = Dense(len(word_index_target)+1, activation='softmax')               # decoder의 은닉상태와 셀상태는 훈련 과정에서는 사용하지 않는다(_)
decoder_outputs = decoder_dense(decoder_outputs)                                    # 출력층의 크기는 번역문의 글자(또는 단어)가 가질 수 있는 크기이다

In [None]:
# 모델 훈련
# full data를 이용하여 epochs를 50회 정도 주어야 제대로 결과가 나온다.
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)
model.compile(optimizer='rmsprop', loss='categorical_crossentropy')
model.fit(x=[encoder_input, decoder_input], y=decoder_target, batch_size=64, epochs=1, validation_split=0.2)

In [None]:
# 예측용 Encoder
# 입력된 문장을 인코더에 넣어서 은닉상태와 셀상태를 얻는다
# encoder_inputs, encoder_states는 훈련용에서 구성한 것을 사용한다
encoder_model = Model(inputs=encoder_inputs, outputs=encoder_states)

In [None]:
# 예측용 Decoder
# Decoder가 산출한 결과를 받는 상태 벡터를 정의
decoder_state_input_h = Input(shape=(256,))
decoder_state_input_c = Input(shape=(256,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]

# 문장의 다음 단어를 예측하기 위해서 은닉상태와 셀상태를 받음
decoder_outputs, state_h, state_c = decoder_lstm(decoder_inputs, initial_state=decoder_states_inputs)
decoder_states = [state_h, state_c]
decoder_outputs = decoder_dense(decoder_outputs)
decoder_model = Model(inputs=[decoder_inputs] + decoder_states_inputs, outputs=[decoder_outputs] + decoder_states)

In [None]:
# word로부터 idx를 얻는 것을 idx로부터 word를 얻는 것으로 바꿈
index_to_src = dict((i, char) for char, i in word_index_source.items())
index_to_tar = dict((i, char) for char, i in word_index_target.items())
print(index_to_tar)

In [None]:
def decode_sequence(input_seq):
    states_value = encoder_model.predict(input_seq)           # 입력문을 인코더에 넣어 문장의 상태 벡터를 얻는다    
    target_seq = np.zeros((1, 1, len(word_index_target)+1))   # 디코더를 초기화한다
    target_seq[0, 0, word_index_target['\t']] = 1.            # 디코더의 첫 시작은 <\t>로 시작하므로 \t 위치에 원-핫 인코딩으로 기록
    
    stop_condition = False
    decoded_sentence = ""
    
    #stop_condition이 True가 될 때까지 루프 반복
    while not stop_condition:
      output_tokens, h, c = decoder_model.predict([target_seq] + states_value)  # target_input과 문장의 상태 벡터 입력
      sampled_token_index = np.argmax(output_tokens)          # 70개의 결과 중 가장 큰 값을 갖는 인덱스를 선택
      
      # sampled_token_index가 0이 나오면, index_to_tar 0은 없으므로 에러가 발생한다.
      # 이 경우 공백을 갖는 index 1로 치환한다.
      # 대량의 데이터에서는 확률상 0이 나오지 않는다
      # 소량의 데이터에서는 대부분 0이 나온다.
      # 소량의 데이터에서 index_to_tar 0으로 치우치는 이유는, 1~69 어떠한 것도 확률이 낮기 때문에
      # 전체의 합을 1로 만들기 위해서, 빈 영역인 0번에 가장 높은 값이 들어가기 때문이다.
      # 즉, 0으로 나왔다는 것은 예측기가 어떠한 음절도 가능성이 낮은 것으로 본다 라고 해석할 수 있다
      if(sampled_token_index==0):  
        sampled_token_index = 1
        
      sampled_char = index_to_tar[sampled_token_index]
      decoded_sentence += sampled_char

      # 종료표시인 <\n>에 도달하거나 최대 길이를 넘으면 중단.
      if (sampled_char == '\n' or len(decoded_sentence) > max_tar_len):
        stop_condition = True

      # 길이가 1인 타겟 시퀀스를 업데이트 합니다.
      target_seq = np.zeros((1, 1, len(word_index_target)+1))   # target_input 초기화
      target_seq[0, 0, sampled_token_index] = 1.                # 직전 예측 결과를 sampled_token_index 위치에 원-핫 인코딩으로 기록

      # 상태를 업데이트 합니다.
      states_value = [h, c]

    return decoded_sentence

In [None]:
import numpy as np
for seq_index in [4, 51, 101]: # 입력 문장의 인덱스
  input_seq = encoder_input[seq_index:seq_index+1]  # 3차원 배열에서는 이와 같이 [n:n+1] 형태로 해주어야 3차원이 유지되면서 n번째가 출력된다
  decoded_sentence = decode_sequence(input_seq)
  
  print(35 * "-")
  print('입력 문장:', data.source[seq_index])
  print('정답 문장:', data.target[seq_index][:len(data.target[seq_index])])
  print('번역기가 번역한 문장:', decoded_sentence[:len(decoded_sentence)-1])    # '\n'은 빼고 출력

In [None]:
# 1개 문장 예측하기
# 전처리
input_seq_1 = tokenizer_source.texts_to_sequences([list('Run!')])
input_seq_1 = pad_sequences(input_seq_1, maxlen=max_src_len, padding='post')
input_seq_1 = to_categorical(input_seq_1, num_classes=len(word_index_source)+1)

# 예측하기 
decoded_sentence = decode_sequence(input_seq_1)
print(decoded_sentence)