<a href="https://colab.research.google.com/github/hyunyoungDA/KcELECTRA-fine-tuning/blob/main/08_MT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 기계 번역

영-한 기계번역을 통한 인코더-디코더 모델 실습. 학습 데이터는 http://www.manythings.org/anki/kor-eng.zip 에서 다운로드.
참고: https://wikidocs.net/86900

이 학습 데이터가 적절해서라기보다는 소규모로 간단히 실습을 진행할 수 있어서 사용함.

## 데이터 준비

In [None]:
import numpy as np
import pandas as pd
import re
import shutil
import os
import unicodedata
import requests
import zipfile

In [None]:
# 결과 재현을 위한 seed값 설정
np.random.seed(17)

In [None]:
url ='http://www.manythings.org/anki/kor-eng.zip'
filename = 'kor-eng.zip'
path = os.path.join(os.getcwd(), 'rsc/kor-eng')
text_file = os.path.join(path, 'kor.txt')

In [None]:
# 영어-한국어 병렬 코퍼스 다운로드
# kor-eng 폴더에 압축파일을 다운로드하여 압축 해제
# 이 코퍼스는 병렬 코퍼스를 사용한 번역 학습 과정을 샘플로 보이기 위한 것으로서,
# 품질이 좋은 코퍼스는 아님.

def download(url, output_file_name):
    '''URL 다운로드하여 output_file_name에 저장'''
    with open(output_file_name, 'wb') as out:
        r = requests.get(url, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36'})
        print(r)
        out.write(r.content)

if not os.path.exists(path):
    os.makedirs(path)

zip_file_name = os.path.join(path, filename)
if not os.path.exists(zip_file_name):
    download(url, zip_file_name)

if not os.path.exists(text_file):
    with zipfile.ZipFile(zip_file_name, 'r') as zipf:
        zipf.extractall(path)

In [None]:
import re

# 문장 전처리 함수
def preprocess_sentence(sent):
    # 단어와 구두점 사이에 공백을 끼워 넣음. 예: "he is a boy." -> "he is a boy ."
    sent = re.sub(r"([?.!,])", r" \1", sent)

    # [단어 구성 문자들, ".", "?", "!", ","]를 제외한 나머지 문자는 모두 공백으로 변환
    sent = re.sub(r"[^\w!.?,]+", r" ", sent)

    # 연속된 공백은 하나로 변환
    sent = re.sub(r"\s+", " ", sent)

    return sent

In [None]:
preprocess_sentence("I knew that Tom was just a freshman, so I was surprised to see him hanging out with seniors.")

'I knew that Tom was just a freshman , so I was surprised to see him hanging out with seniors .'

In [None]:
preprocess_sentence("난 톰이 그냥 신입생일 뿐이라고만 알았는데, 그러다보니 톰이랑 선배들이 서로 어울려다니는 걸 보고 놀랐어.")

'난 톰이 그냥 신입생일 뿐이라고만 알았는데 , 그러다보니 톰이랑 선배들이 서로 어울려다니는 걸 보고 놀랐어 .'

In [None]:
# 병렬 코퍼스를 전처리하여 로딩(영한 번역용)
# 파일 내용은 영어-한글 순으로 작성되어 있음
def load_preprocessed_data(fname):
    encoder_input, decoder_input, decoder_target = [], [], []

    with open(fname, "r", encoding='utf8') as f:
        for line in f.readlines():
            # source 데이터와 target 데이터 분리
            src_line, tar_line, _ = line.strip().split('\t')

            # source 데이터 전처리(영어)
            src_line_input = [w for w in preprocess_sentence(src_line).split()]

            # target 데이터 전처리(한국어)
            tar_line = preprocess_sentence(tar_line)
            tar_line_input = [w for w in ("<sos> " + tar_line).split()]  # 디코더 입력용
            tar_line_target = [w for w in (tar_line + " <eos>").split()]  # 출력용

            encoder_input.append(src_line_input)
            decoder_input.append(tar_line_input)
            decoder_target.append(tar_line_target)

    # (인코더 입력, 디코더 입력, 디코더 출력) 리턴
    return encoder_input, decoder_input, decoder_target

In [None]:
sents_src_in, sents_tar_in, sents_tar_out = load_preprocessed_data(text_file)
print(sents_src_in[:5])
print(sents_tar_in[:5])
print(sents_tar_out[:5])

[['Go', '.'], ['Hi', '.'], ['Run', '!'], ['Run', '.'], ['Who', '?']]
[['<sos>', '가', '.'], ['<sos>', '안녕', '.'], ['<sos>', '뛰어', '!'], ['<sos>', '뛰어', '.'], ['<sos>', '누구', '?']]
[['가', '.', '<eos>'], ['안녕', '.', '<eos>'], ['뛰어', '!', '<eos>'], ['뛰어', '.', '<eos>'], ['누구', '?', '<eos>']]


In [None]:
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer

# 정수 인코딩
# 토큰 제거 없이 모두 사용
tok_src = Tokenizer(filters="", lower=False)
tok_src.fit_on_texts(sents_src_in)
encoder_input = tok_src.texts_to_sequences(sents_src_in)

tok_tar = Tokenizer(filters="", lower=False)
tok_tar.fit_on_texts(sents_tar_in)
tok_tar.fit_on_texts(sents_tar_out)
decoder_input = tok_tar.texts_to_sequences(sents_tar_in)
decoder_target = tok_tar.texts_to_sequences(sents_tar_out)

In [None]:
from tensorflow.keras.preprocessing.sequence import pad_sequences

# 패딩
encoder_input = pad_sequences(encoder_input, padding="post")
decoder_input = pad_sequences(decoder_input, padding="post")
decoder_target = pad_sequences(decoder_target, padding="post")

In [None]:
print(encoder_input.shape)
print(decoder_input.shape)
print(decoder_target.shape)

(5890, 110)
(5890, 96)
(5890, 96)


In [None]:
print(encoder_input[100])

[1932  307   41    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0]


In [None]:
src_vocab_size = len(tok_src.word_index) + 1
tar_vocab_size = len(tok_tar.word_index) + 1
print(src_vocab_size)
print(tar_vocab_size)

3480
7965


In [None]:
# 단어 -> 정수, 정수 -> 단어 딕셔너리 준비
src_to_index = tok_src.word_index
index_to_src = tok_src.index_word

tar_to_index = tok_tar.word_index
index_to_tar = tok_tar.index_word

In [None]:
# 데이터 셔플링 준비: 샘플 id 셔플
idx = np.arange(encoder_input.shape[0])
np.random.shuffle(idx)
print(idx)

[ 488 4207  533 ... 5510 2191 2671]


In [None]:
# 셔플된 샘플 id 순서대로 재배치
encoder_input = encoder_input[idx]
decoder_input = decoder_input[idx]
decoder_target = decoder_target[idx]

In [None]:
encoder_input[100]

array([   2,   16, 1113,    6,    1,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0],
      dtype=int32)

In [None]:
decoder_input[100]

array([   2,    9,   33, 1070,    7,    1,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0], dtype=int32)

In [None]:
decoder_target[100]

array([   9,   33, 1070,    7,    1,    3,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0], dtype=int32)

In [None]:
# 10%를 검증 데이터로 사용
num_of_val = int(encoder_input.shape[0]*0.1)
num_of_val

589

In [None]:
# 학습 집합과 검증 집합 분리
encoder_input_train = encoder_input[:-num_of_val]
decoder_input_train = decoder_input[:-num_of_val]
decoder_target_train = decoder_target[:-num_of_val]

encoder_input_val = encoder_input[-num_of_val:]
decoder_input_val = decoder_input[-num_of_val:]
decoder_target_val = decoder_target[-num_of_val:]

In [None]:
print(encoder_input_train.shape, encoder_input.dtype)
print(decoder_input_train.shape, decoder_input_train.dtype)
print(decoder_target_train.shape, decoder_target_train.dtype)
print(encoder_input_val.shape, encoder_input_val.dtype)
print(decoder_input_val.shape, decoder_input_val.dtype)
print(decoder_target_val.shape, decoder_target_val.dtype)

(5301, 110) int32
(5301, 96) int32
(5301, 96) int32
(589, 110) int32
(589, 96) int32
(589, 96) int32


## 기계번역 모델 학습

In [None]:
from tensorflow.keras.layers import Input, LSTM, Embedding, Dense, Masking
from tensorflow.keras.models import Model

In [None]:
# 임베딩 벡터 크기 및 LSTM 은닉 상태 크기 설정
# 실제 문제에 적용할 때는 적절한 값으로 변경 필요
embedding_dim = 300
hidden_units = 200

In [None]:
# 인코더 설정
encoder_inputs = Input(shape=(None,), dtype='int32', name='enc_input')  # 입력층
enc_emb =  Embedding(src_vocab_size, embedding_dim, name='enc_emb')(encoder_inputs)  # 임베딩 층
enc_masking = Masking(mask_value=0.0, name='enc_mask')(enc_emb)  # 패딩 마스킹: 패딩 토큰인 0은 연산에서 제외
encoder_lstm = LSTM(hidden_units, return_state=True, name='enc_lstm')  # 마지막 상태를 디코더로 전달하기 위해 return_state는 True
encoder_outputs, state_h, state_c = encoder_lstm(enc_masking)  # 은닉 상태와 셀 상태를 리턴
encoder_states = [state_h, state_c]  # 인코더의 은닉 상태와 셀 상태를 저장
print(encoder_outputs.shape, state_h.shape, state_c.shape)

(None, 200) (None, 200) (None, 200)


In [None]:
# 디코더 설정
decoder_inputs = Input(shape=(None,), dtype='int32', name='dec_input')  # 입력층
dec_emb_layer = Embedding(tar_vocab_size, embedding_dim, name='dec_emb')  # 임베딩 층
dec_emb = dec_emb_layer(decoder_inputs)
dec_masking = Masking(mask_value=0.0, name='dec_mask')(dec_emb)  # 패딩 마스킹: 패딩 토큰인 0은 연산에서 제외

# 상태값 리턴을 위해 return_state는 True, 모든 타임스텝에 대해서 단어를 예측하기 위해 return_sequences는 True
decoder_lstm = LSTM(hidden_units, return_sequences=True, return_state=True, name='dec_lstm')

# 인코더의 마지막 상태를 디코더의 초기 상태로 사용
decoder_outputs, _, _ = decoder_lstm(dec_masking, initial_state=encoder_states)

# 모든 타임스텝의 결과에 대해서 소프트맥스 함수를 사용한 출력층을 통해 단어 예측
decoder_dense = Dense(tar_vocab_size, activation='softmax', name='dec_dense')
decoder_outputs = decoder_dense(decoder_outputs)
print(decoder_outputs.shape)

(None, None, 7965)


In [None]:
# 학습 모델 정의
# 학습 시에는 디코더 입력으로 정답(decoder_inputs)을 사용함 (teacher forcing)
# 입력: [인코더 입력, 디코더 입력]
# 출력: 디코더 출력
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

# 다중 클래스 분류 문제.
# 레이블이 원-핫 인코딩 되지 않은 정수값이므로, 손실 함수는 sparse_categorical_crossentropy
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['acc'])

model.summary()

In [None]:
history = model.fit(x=[encoder_input_train, decoder_input_train], y=decoder_target_train,
              validation_data=([encoder_input_val, decoder_input_val], decoder_target_val),
              batch_size = 64, epochs = 200)

Epoch 1/200
[1m83/83[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 84ms/step - acc: 0.8803 - loss: 3.6395 - val_acc: 0.9453 - val_loss: 0.3953
Epoch 2/200
[1m83/83[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 78ms/step - acc: 0.9471 - loss: 0.3916 - val_acc: 0.9565 - val_loss: 0.3693
Epoch 3/200
[1m83/83[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 79ms/step - acc: 0.9553 - loss: 0.3669 - val_acc: 0.9570 - val_loss: 0.3583
Epoch 4/200
[1m83/83[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 81ms/step - acc: 0.9553 - loss: 0.3561 - val_acc: 0.9570 - val_loss: 0.3538
Epoch 5/200
[1m83/83[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 82ms/step - acc: 0.9556 - loss: 0.3451 - val_acc: 0.9570 - val_loss: 0.3511
Epoch 6/200
[1m83/83[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 80ms/step - acc: 0.9553 - loss: 0.3413 - val_acc: 0.9570 - val_loss: 0.3494
Epoch 7/200
[1m83/83[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 80ms/ste

## 기계번역 추론/예측
학습 결과 적용하여 번역하기. 학습과 추론/예측 과정이 다르므로 모델을 다시 설계해야 함.

In [None]:
# 인코더 모델 정의
# 학습 과정에서 사용한 인코더 그대로 재사용
encoder_model = Model(encoder_inputs, encoder_states)

In [None]:
# 디코더 모델 정의
# 추론/예측 시에는 디코더 입력으로 이전 타임 스텝의 디코더 출력(상태)을 사용함
# 이전 타임 스텝의 상태를 보관할 텐서
decoder_state_input_h = Input(shape=(hidden_units,))  # 입력층: 은닉 상태
decoder_state_input_c = Input(shape=(hidden_units,))  # 입력층: 셀 상태
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]

# 학습 때 사용했던 임베딩 층을 재사용
dec_emb2= dec_emb_layer(decoder_inputs)

# 다음 단어 예측을 위해 이전 타임 스텝의 상태를 현 타임 스텝의 초기 상태로 사용
decoder_outputs2, state_h2, state_c2 = decoder_lstm(dec_emb2, initial_state=decoder_states_inputs)
decoder_states2 = [state_h2, state_c2]

# 모든 타임 스텝에 대해서 단어 예측
decoder_outputs2 = decoder_dense(decoder_outputs2)

# 디코더 모델
decoder_model = Model([decoder_inputs] + decoder_states_inputs, [decoder_outputs2] + decoder_states2)

In [None]:
decoder_model.summary()

In [None]:
# 번역 실행을 위한 함수
def decode_sequence(input_seq, limit=100):
    # 입력으로부터 인코더의 상태를 얻음
    states_value = encoder_model.predict(input_seq)

    # <sos>에 해당하는 정수 생성
    target_seq = np.zeros((1,1))
    target_seq[0, 0] = tok_tar.word_index['<sos>']

    stop_condition = False
    decoded_sentence = ''

    # stop_condition이 True가 될 때까지 루프 반복
    while not stop_condition:
        # 이전 타임 스텝의 상태 states_value를 현 타임 스텝의 입력으로 사용
        # 최소 상태 값은 인코더의 마지막 상태 값
        output_tokens, h, c = decoder_model.predict([target_seq] + states_value)

        # 예측 결과를 단어로 변환
        sampled_token_index = np.argmax(output_tokens[0, -1, :])
        if sampled_token_index != 0:
            sampled_word = tok_tar.index_word[sampled_token_index]
        else:
            sampled_word = '_'

        # 현재 타임 스텝의 예측 단어를 예측 문장에 추가
        decoded_sentence += ' '+sampled_word

        # <eos>에 도달하거나 정해진 길이를 넘으면 중단
        if (sampled_word == '<eos>' or len(decoded_sentence) > limit):
            stop_condition = True

        # 현재 타임 스텝의 예측 결과를 다음 타임 스텝의 입력으로 사용하기 위해 저장
        target_seq = np.zeros((1,1))
        target_seq[0, 0] = sampled_token_index

        # 현재 타임 스텝의 상태를 다음 타임 스텝의 입력으로 사용하기 위해 저장
        states_value = [h, c]

    return decoded_sentence

In [None]:
# 원문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq2src(input_seq):
    text = ''
    for i in input_seq:
        if(i != 0):
            text = text + tok_src.index_word[i] + ' '
    return text

# 번역문의 정수 시퀀스를 텍스트 시퀀스로 변환
def seq2tar(input_seq):
    text = ''
    for i in input_seq:
        if((i != 0 and i != tok_tar.word_index['<sos>']) and i != tok_tar.word_index['<eos>']):
            text = text + tok_tar.index_word[i] + ' '
    return text

In [None]:
# 학습 집합 내 샘플 문장에 대해 번역 결과 확인
sample_idx = [10, 100, 1000]
for i in sample_idx:
    input_seq = encoder_input_train[i: i+1]
    decoded_sentence = decode_sequence(input_seq)

    print('원문: ', seq2src(encoder_input_train[i]))
    print('번역문(정답):  ', seq2tar(decoder_input_train[i]))
    print('번역문(시스템): ', decoded_sentence[:-5])  # 마지막 부분의 '<eos>'는 제외하고 출력

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 217ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 138ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 36ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 40ms/step
원문:  Keep listening . 
번역문(정답):   계속 들어 . 
번역문(시스템):   계속 들어 . 
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 35ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 36ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 34ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 59ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 56ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 55ms/step
원문:  I m behind you . 
번역문(정답):   난 네 뒤에 있어 . 
번역문(시스템):   난 네 뒤에 있어 .

In [None]:
# 검증 집합 내 샘플 문장에 대해 번역 결과 확인
sample_idx = [100, 200, 300]
for i in sample_idx:
    input_seq = encoder_input_val[i: i+1]
    decoded_sentence = decode_sequence(input_seq)

    print('원문: ', seq2src(encoder_input_val[i]))
    print('번역문(정답):  ', seq2tar(decoder_input_val[i]))
    print('번역문(시스템): ', decoded_sentence[:-5])  # 마지막 부분의 '<eos>'는 제외하고 출력

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 50ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 57ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 63ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 75ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 54ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 63ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 62ms/step
원문:  Tom is smiling . 
번역문(정답):   톰은 웃고 있다 . 
번역문(시스템):   톰은 미소를 짓고 있다 . 
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 50ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 36ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 36ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 40ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 42ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 