## 전처리 및 토크나이징

In [1]:
import re
import unicodedata
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.layers import Input, LSTM, Embedding, Dense, Concatenate, Softmax, Dropout, Attention

In [2]:
df1 = pd.read_excel('/content/drive/MyDrive/한국어-영어 번역(병렬) 말뭉치/1_구어체(1).xlsx')
df1 = df1[['원문','번역문']]
df1

In [None]:
df2 = pd.read_excel('/content/drive/MyDrive/한국어-영어 번역(병렬) 말뭉치/1_구어체(2).xlsx')
df2 = df2[['원문','번역문']]
df2

In [4]:
df3 = pd.read_excel('/content/drive/MyDrive/한국어-영어 번역(병렬) 말뭉치/2_대화체.xlsx')
df3 = df3[['원문','번역문']]
df3

In [56]:
# 하이퍼 파라미터
BATCH_SIZE = 256 # Batch size for training.
EPOCHS = 10  # Number of epochs to train for.
HIDDEN_DIM = 256  # Latent dimensionality of the encoding space.
EMBEDDING_DIM = 128
NUM_SAMPLES = 500000  # Number of samples to train on.

In [None]:
# dataset 
hangeul_corpus, eng_corpus = [], []
for row1, row2 in zip(df1.itertuples(), df2.itertuples()):
    # source 데이터와 target 데이터 분리
    hangeul1, eng1 = row1.원문, row1.번역문
    hangeul2, eng2 = row2.원문, row2.번역문
    
    hangeul_corpus.extend([hangeul1, hangeul2])
    eng_corpus.extend([eng1, eng2])

for row3 in df3.itertuples():
    hangeul3, eng3 = row3.원문, row3.번역문

    hangeul_corpus.append(hangeul3)
    eng_corpus.append(eng3)
    
print(len(hangeul_corpus), hangeul_corpus[:10])
print(len(eng_corpus), eng_corpus[:10])

In [58]:
def unicode_hangeul(s):
    # 한글 뗐다 붙이기 -> 컴퓨터가 인식하는 문자열의 길이를 같게 만들어주기 위함
    return unicodedata.normalize('NFKC', s)

In [59]:
def preprocess_sentence(sent):
    # 한글 전처리 함수
    sent = unicode_hangeul(sent)

    # 단어와 구두점 사이에 공백을 만듭니다.
    # Ex) "he is a boy." => "he is a boy ."
    sent = re.sub(r"([?.!])", r" \1", sent)

    # (a-z, A-Z, 가-힣, 0-9, ".", "?", "!", ",", "'") 이들을 제외하고는 전부 공백으로 변환합니다.
    sent = re.sub(r"[^a-zA-Z가-힣ㄱ-ㅎ0-9!.?']+", r" ", sent)

    # 다수 개의 공백을 하나의 공백으로 치환
    sent = re.sub(r"\s+", " ", sent)
    return sent.lower()

In [None]:
# huggingface tokenizer 써봄 -> 한글만, 영어는 그냥 텐서플로 토크나이저로
!pip install tokenizers

In [61]:
from tokenizers import BertWordPieceTokenizer

In [62]:
# huggingface tokenizer는 먼저 학습을 시켜야함. 원문과 번역문을 각각 txt파일로 저장
with open('make_translator[kor].txt', 'w', encoding='utf8') as f:
    f.write('\n'.join(pd.Series(hangeul_corpus)))

In [63]:
hug_kor_tok = BertWordPieceTokenizer(lowercase=False, strip_accents=False)

In [64]:
hug_kor_tok.train(files='/content/make_translator[kor].txt', vocab_size=60000, limit_alphabet=10000, min_frequency=5)

In [65]:
def load_preprocessed_data(hangeul_corpus, eng_corpus): # 여기서 바로 전처리 후 토크나이징(한글), 영어는 띄어쓰기로만
    encoder_input, decoder_input, decoder_target = [], [], []

    for i, (src_line, tar_line) in enumerate(zip(hangeul_corpus, eng_corpus)):
        src_line = preprocess_sentence(src_line)
        src_encoded = hug_kor_tok.encode(src_line)
        src_encoded = src_encoded.tokens

        tar_line = preprocess_sentence(tar_line)
        tar_line_in = [w for w in ("<sos> " + tar_line).split()]
        tar_line_out = [w for w in (tar_line + " <eos>").split()]

        encoder_input.append(src_encoded) # 클리닝 
        decoder_input.append(tar_line_in) # 클리닝 + sos 
        decoder_target.append(tar_line_out) # 클리닝 + eos 

        if i == NUM_SAMPLES - 1:
            break
                    
    return encoder_input, decoder_input, decoder_target

In [66]:
sents_hangeul_in, sents_en_in, sents_en_out  = load_preprocessed_data(hangeul_corpus, eng_corpus)

In [None]:
print('인코더의 입력 :',sents_hangeul_in[-5:])
print('디코더의 입력 :',sents_en_in[-5:])
print('디코더의 레이블 :',sents_en_out[-5:])

In [68]:
# 순서처리로 텐서플로 토크나이저 이용, 한글
tokenizer_kor = Tokenizer(filters="", lower=False)
tokenizer_kor.fit_on_texts(sents_hangeul_in)
encoder_input = tokenizer_kor.texts_to_sequences(sents_hangeul_in)

In [69]:
# 순서처리로 텐서플로 토크나이저 이용, 영어
tokenizer_eng = Tokenizer(filters="", lower=False)
tokenizer_eng.fit_on_texts(sents_en_in)
tokenizer_eng.fit_on_texts(sents_en_out)

# 디코더 데이터
decoder_input = tokenizer_eng.texts_to_sequences(sents_en_in)
decoder_target = tokenizer_eng.texts_to_sequences(sents_en_out)

In [None]:
SRC_VOCAB_SIZE = len(tokenizer_kor.word_index) + 1
TAR_VOCAB_SIZE = len(tokenizer_eng.word_index) + 1

print(f"한글 단어 집합의 크기 : {SRC_VOCAB_SIZE}, 영어 단어 집합의 크기 : {TAR_VOCAB_SIZE}")

In [72]:
encoder_input = pad_sequences(encoder_input, padding='post')
decoder_input = pad_sequences(decoder_input, padding='post')
decoder_target = pad_sequences(decoder_target, padding='post')

In [None]:
print('인코더의 입력의 크기(shape) :',encoder_input.shape)
print('디코더의 입력의 크기(shape) :',decoder_input.shape)
print('디코더의 레이블의 크기(shape) :',decoder_target.shape)

# 변수 저장
MAX_ENC_LEN, MAX_DEC_LEN = encoder_input.shape[1], decoder_input.shape[1]

In [74]:
src2idx = tokenizer_kor.word_index  # word : idx
idx2src = tokenizer_kor.index_word  # idx : word
tar2idx = tokenizer_eng.word_index # word : idx
idx2tar = tokenizer_eng.index_word # idx : word

In [None]:
print(tar2idx)
print(idx2tar)

In [None]:
# 랜덤 인덱스 생성 
indices = np.arange(encoder_input.shape[0])
np.random.shuffle(indices)
print('랜덤 시퀀스 :',indices)

# 랜덤하게 섞기
encoder_input = encoder_input[indices]
decoder_input = decoder_input[indices]
decoder_target = decoder_target[indices]

print(encoder_input[indices][0])
print(decoder_input[indices[0]])
print(decoder_target[indices[0]])

In [77]:
n_of_val = int(NUM_SAMPLES*0.1) # 20000

# train data
encoder_input_train = encoder_input[:-n_of_val] 
decoder_input_train = decoder_input[:-n_of_val]
decoder_target_train = decoder_target[:-n_of_val]

# test data
encoder_input_test = encoder_input[-n_of_val:]
decoder_input_test = decoder_input[-n_of_val:]
decoder_target_test = decoder_target[-n_of_val:]

In [None]:
print('훈련 source 데이터의 크기 :',encoder_input_train.shape)
print('훈련 target 데이터의 크기 :',decoder_input_train.shape)
print('훈련 target 레이블의 크기 :',decoder_target_train.shape)
print('테스트 source 데이터의 크기 :',encoder_input_test.shape)
print('테스트 target 데이터의 크기 :',decoder_input_test.shape)
print('테스트 target 레이블의 크기 :',decoder_target_test.shape)

In [79]:
# 인코더
# input, layer
encoder_inputs = Input(shape = (MAX_ENC_LEN,))
enc_emb_layer = Embedding(SRC_VOCAB_SIZE, EMBEDDING_DIM, name='ENC_Embedding')
enc_dropout = Dropout(0.2, name='ENC_Dropout')
enc_lstm = LSTM(HIDDEN_DIM, return_state=True, return_sequences=True, name='ENC_LSTM')

# graph
enc_emb = enc_emb_layer(encoder_inputs)
enc_emb = enc_dropout(enc_emb)
encoder_outputs, enc_h, enc_c = enc_lstm(enc_emb)
encoder_states = [enc_h, enc_c]

In [80]:
# 디코더
# input, layer
decoder_inputs = Input(shape = (MAX_DEC_LEN,))
dec_emb_layer = Embedding(TAR_VOCAB_SIZE, EMBEDDING_DIM, name='DEC_Embedding')
dec_dropout = Dropout(0.2, name='DEC_Dropout')
dec_lstm = LSTM(HIDDEN_DIM, return_state=True, return_sequences=True, name='DEC_LSTM')
att = Attention()
dense_tanh = Dense(HIDDEN_DIM, activation = 'tanh')
dec_dense = Dense(TAR_VOCAB_SIZE, activation='softmax', name='DEC_Dense')
dec_emb = dec_emb_layer(decoder_inputs)
dec_emb = dec_dropout(dec_emb)
decoder_output_, dec_h, dec_c = dec_lstm(dec_emb, initial_state=encoder_states)

# 어텐션을 구현해보자
# attention_score = tf.matmul(dec_h, encoder_outputs, transpose_b=True)
# attention_weight = tf.nn.softmax(attention_score)
# attention_values = tf.matmul(attention_weight, encoder_outputs)

# 어텐션 클래스를 사용해보자
context_vector = att([decoder_output_, encoder_outputs])
concat = dense_tanh(Concatenate(axis=-1)([context_vector, decoder_output_]))
decoder_outputs = dec_dense(concat)

In [81]:
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['acc'])

In [None]:
model.summary()

In [83]:
earlystopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', restore_best_weights=True, patience=3)

In [84]:
# 체크포인트로 현재 모델의 베스트 weight 저장
checkpoint_path = '/content/drive/MyDrive/checkpoint.h5'
checkpoint = ModelCheckpoint(filepath=checkpoint_path, 
                             save_weights_only=True,
                             save_best_only=True, 
                             monitor='val_loss', 
                             verbose=1
                            )

In [85]:
# 연속하여 학습시 체크포인트를 로드하여 이어서 학습. 만약에 체크포인트 파일이 없는 초기상태라면 해당 코드는 비활성화.
model.load_weights(checkpoint_path)

In [None]:
history = model.fit(x = [encoder_input_train, decoder_input_train], 
          y = decoder_target_train,
          validation_data = ([encoder_input_test, decoder_input_test], decoder_target_test),
          batch_size = BATCH_SIZE, 
          callbacks = [earlystopping, checkpoint],
          epochs = EPOCHS)

In [None]:
import matplotlib.pyplot as plt

def plot_history(history):
  hist = pd.DataFrame(history.history)
  hist['epoch'] = history.epoch

  plt.figure(figsize=(12, 6))

  plt.subplot(1,2,1)
  plt.xlabel('Epoch')
  plt.ylabel('Loss')
  plt.plot(hist['epoch'], hist['loss'],
           label='Train Loss')
  plt.plot(hist['epoch'], hist['val_loss'],
           label = 'Val Loss')
  plt.ylim([0,5])
  plt.subplot(1,2,2)
  plt.xlabel('Epoch')
  plt.ylabel('Accuracy')
  plt.plot(hist['epoch'], hist['acc'],
           label='Train Acc')
  plt.plot(hist['epoch'], hist['val_acc'],
           label = 'Val Acc')
  plt.ylim([0,20])
  plt.legend()
  plt.show()

plot_history(history)

In [88]:
# 인코더(predict)
encoder_model = Model(encoder_inputs, [encoder_outputs, encoder_states])

In [89]:
# 디코더(predict)

# Input Tensors : 이전 시점의 상태를 보관할 텐서
decoder_input_h = Input(shape=(HIDDEN_DIM,))
decoder_input_c = Input(shape=(HIDDEN_DIM,))

decoder_states_inputs = [decoder_input_h, decoder_input_c]

# 훈련 때 사용했던 임베딩 층을 재사용
x = dec_emb_layer(decoder_inputs)

# 다음 단어 예측을 위해 이전 시점의 상태를 현 시점의 초기 상태로 사용
x, state_h2, state_c2 = dec_lstm(x, initial_state=decoder_states_inputs)
decoder_states2 = [state_h2, state_c2]

# 수정된 디코더
attn_layer = att([x, encoder_outputs])
decoder_concat = Concatenate(axis=-1)([attn_layer, x])
attn_out = dense_tanh(decoder_concat)
decoder_outputs = dec_dense(attn_out)

decoder_model = Model(
    [decoder_inputs] + decoder_states_inputs + [encoder_outputs],
    [decoder_outputs] + decoder_states2)

In [90]:
def translate(sentence):
    sentence = preprocess_sentence(sentence)
    tokens = hug_kor_tok.encode(sentence).tokens

    # 입력 문장 토큰 -> 라벨링
    enc_input = tokenizer_kor.texts_to_sequences([tokens])

    # 입력 문장 라벨링 -> 패딩 
    enc_input = tf.keras.preprocessing.sequence.pad_sequences(enc_input, maxlen=MAX_ENC_LEN, padding='post')
    encoder_output, states_value = encoder_model.predict(enc_input)

    # Decoder input인 <SOS>에 해당하는 정수 생성
    target_seq = np.zeros((1,1))
    target_seq[0, 0] = tar2idx['<sos>']

    # prediction 시작
        # stop_condition이 True가 될 때까지 루프 반복
        # 구현의 간소화를 위해서 이 함수는 배치 크기를 1로 가정합니다.
    stop_condition = False
    decoded_sentence = ''

    for t in range(MAX_DEC_LEN):

        # 이전 시점의 상태 states_value를 현 시점의 초기 상태로 사용
        output_tokens, h, c = decoder_model.predict([target_seq] + states_value + [encoder_output], verbose = 0)

        # 예측 결과를 단어로 변환
        result_token_index = np.argmax(output_tokens[0, -1, :])
        result_word = idx2tar[result_token_index]

        # 현재 시점의 예측 단어를 예측 문장에 추가
        decoded_sentence += ' ' + result_word

        # 현재 시점의 예측 결과 -> 다음 시점의 입력으로 업데이트
        target_seq = np.zeros((1,1))
        target_seq[0, 0] = result_token_index

        # 현재 시점의 상태 ->  다음 시점의 상태로 업데이트
        states_value = [h, c]

        #  Stop condition <eos>에 도달하면 중단.
        if result_word == '<eos>':
            break 

    return decoded_sentence.strip('<eos>')

In [126]:
train_idx = indices[:-n_of_val]
test_idx = indices[-n_of_val:]

In [None]:
n_samples = 5
np.random.choice(train_idx, n_samples)

In [None]:
# train data - translate
n_samples = 5
for idx in np.random.choice(train_idx, n_samples):
    test_sentence = hangeul_corpus[idx]
    answer_sentence = eng_corpus[idx]
    decoded_sentence = translate(test_sentence)

    print("입력문장 :", test_sentence)
    print("정답문장 :", answer_sentence) 
    print("번역문장 :", decoded_sentence)
    print("-"*50)

In [129]:
# !pip install evaluate

In [None]:
from tqdm.notebook import tqdm 

n_samples = 50
ref_train, pred_train = [], []
for idx in tqdm(np.random.choice(test_idx, n_samples)):
    ref_train.append(eng_corpus[idx])
    pred_train.append(translate(hangeul_corpus[idx]))

In [None]:
print(ref_train) 
print(pred_train)

In [132]:
import evaluate
bleu = evaluate.load("bleu")

In [None]:
bleu.compute(predictions=pred_train, references=ref_train)