# 한영 번역기 만들기

In [1]:
import tensorflow as tf
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split

import matplotlib.ticker as ticker
import matplotlib.pyplot as plt

import tarfile
import time
import re
import os
import io

print(tf.__version__)

2.6.0


## step 1. 데이터 다운로드


In [2]:

# 압축 파일 경로
compressed_file_path = "korean-english-park.train.tar.gz"

# 압축 해제할 폴더 경로
extracted_folder = "data" 

# 압축 파일 해제
with tarfile.open(compressed_file_path, "r:gz") as tar:
    tar.extractall(path=extracted_folder)

# 내부 파일 목록 확인
extracted_files = os.listdir(extracted_folder)
print("Extracted files:", extracted_files)


Extracted files: ['korean-english-park.train.ko', 'korean-english-park.train.en']


In [4]:
# 한국어 데이터 형태 확인

path_to_file = "data/korean-english-park.train.ko"
with open(path_to_file, "r") as f:
    kr_raw = f.read().splitlines()

print("Data Size:", len(kr_raw))
print("Example:")

for sen in kr_raw[0:100][::20]: print(">>", sen)

Data Size: 94123
Example:
>> 개인용 컴퓨터 사용의 상당 부분은 "이것보다 뛰어날 수 있느냐?"
>> 북한의 핵무기 계획을 포기하도록 하려는 압력이 거세지고 있는 가운데, 일본과 북한의 외교관들이 외교 관계를 정상화하려는 회담을 재개했다.
>> "경호 로보트가 침입자나 화재를 탐지하기 위해서 개인적으로, 그리고 전문적으로 사용되고 있습니다."
>> 수자원부 당국은 논란이 되고 있고, 막대한 비용이 드는 이 사업에 대해 내년에 건설을 시작할 계획이다.
>> 또한 근력 운동은 활발하게 걷는 것이나 최소한 20분 동안 뛰는 것과 같은 유산소 활동에서 얻는 운동 효과를 심장과 폐에 주지 않기 때문에, 연구학자들은 근력 운동이 심장에 큰 영향을 미치는지 여부에 대해 논쟁을 해왔다.


In [5]:
# 한국어 데이터 형태 확인

path_to_file = "data/korean-english-park.train.en"
with open(path_to_file, "r") as f:
    en_raw = f.read().splitlines()

print("Data Size:", len(kr_raw))
print("Example:")

for sen in en_raw[0:100][::20]: print(">>", sen)

Data Size: 94123
Example:
>> Much of personal computing is about "can you top this?"
>> Amid mounting pressure on North Korea to abandon its nuclear weapons program Japanese and North Korean diplomats have resumed talks on normalizing diplomatic relations.
>> “Guard robots are used privately and professionally to detect intruders or fire,” Karlsson said.
>> Authorities from the Water Resources Ministry plan to begin construction next year on the controversial and hugely expensive project.
>> Researchers also have debated whether weight-training has a big impact on the heart, since it does not give the heart and lungs the kind of workout they get from aerobic activities such as brisk walking or running for at least 20 minutes.


## 데이터 정제

In [6]:
en_raw[1]

'so a mention a few weeks ago about a rechargeable wireless optical mouse brought in another rechargeable, wireless mouse.'

In [7]:

# 중복을 제거하면서 동일한 인덱스의 데이터도 제거하기 위한 set 초기화
seen_items = set()
kr_corpus = []
en_corpus = []

# 중복 제거를 위한 반복문
for item1, item2 in zip(kr_raw, en_raw):
    if item1 not in seen_items:
        kr_corpus.append(item1)
        en_corpus.append(item2)
        seen_items.add(item1)

print("Cleaned list1:", len(kr_corpus))
print("Cleaned list2:", len(en_corpus))


Cleaned list1: 77591
Cleaned list2: 77591


In [8]:
from konlpy.tag import Mecab
from tensorflow.keras.preprocessing.text import Tokenizer

# 전처리 함수
def preprocess_sentence(sentence, lang, s_token=False, e_token=False):
    sentence = sentence.lower().strip()
    
    if lang == 'en':
        sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
        sentence = re.sub(r'[" "]+', " ", sentence)
        sentence = re.sub(r"[^a-zA-Z?.!,]+", " ", sentence)
    elif lang == 'ko':
        mecab = Mecab()
        sentence = ' '.join(mecab.morphs(sentence))
    
    sentence = sentence.strip()
    
    # decoder의 입력 문장과 라벨로 사용할 출력 문장에 꼭 필요
    if s_token:
        sentence = '<start> ' + sentence

    if e_token:
        sentence += ' <end>'
    
    return sentence

In [23]:
kr_corpus_cleaned = []
en_corpus_cleaned = []

for i in kr_corpus:
    kr_corpus_cleaned.append(preprocess_sentence(i, lang='kr', s_token=False, e_token=False))
    
for i in en_corpus:    
    en_corpus_cleaned.append(preprocess_sentence(i, lang='en', s_token=True, e_token=True))

In [24]:
print(kr_corpus_cleaned[1])
print(en_corpus_cleaned[1])

모든 광마우스와 마찬가지 로 이 광마우스도 책상 위에 놓는 마우스 패드를 필요로 하지 않는다.
<start> so a mention a few weeks ago about a rechargeable wireless optical mouse brought in another rechargeable , wireless mouse . <end>


In [25]:
# 토큰 길이가 40 이하인 데이터 선별
max_length = 40
en_corpus_filtered = []
kr_corpus_filtered = []

for item1, item2 in zip(kr_corpus_cleaned, en_corpus_cleaned):
    if len(item1) <= max_length:
        kr_corpus_filtered.append(item1)
        en_corpus_filtered.append(item2)

print("len of kr_corpus_filterted:", len(kr_corpus_filtered))
print("len of en_corpus_filterted:", len(en_corpus_filtered))

len of kr_corpus_filterted: 14848
len of en_corpus_filterted: 14848


In [38]:
print(kr_corpus_filtered[:3])
print(en_corpus_filtered[:3])

['개인용 컴퓨터 사용의 상당 부분은 "이것보다 뛰어날 수 있느냐?"', '그러나 이것은 또한 책상도 필요로 하지 않는다.', '많은 인질들이 화학 가스의 영향으로 고통을 겪으며 병원으로 옮겨졌다.']
['<start> much of personal computing is about can you top this ? <end>', '<start> like all optical mice , but it also doesn t need a desk . <end>', '<start> many captives were taken to hospital suffering from the effects of the chemical . <end>']


## step 3. 데이터 토큰화

In [26]:
def tokenize(corpus, lang):
    tokenizer = Tokenizer(filters='', lower=False)
    tokenizer.fit_on_texts(corpus)
    tensor = tokenizer.texts_to_sequences(corpus)
    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='pre')

    return tensor, tokenizer

In [27]:
# 토큰화
kr_token, kr_tokenizer = tokenize(kr_corpus_filtered, lang='kr')
en_token, en_tokenizer = tokenize(en_corpus_filtered, lang='en')


In [28]:
kr_token[:5]

array([[    0,     0,     0,  4146,   296,  6358,  6359,  1911, 11823,
        11824,     6, 11825],
       [    0,     0,     0,     0,     0,     5,   240,    19, 11826,
         2977,   122,   110],
       [    0,     0,     0,    34,  4147,  6360, 11827,  6361,  2341,
        11828,   329,  2978],
       [    0,     0,     0,     0,     0,     0, 11829,   932, 11830,
         2979,  2342,  1912],
       [    0,     0,     0,     0,     0,     0,     0,     0,     0,
            0,  6362,   599]], dtype=int32)

## Step 4. 모델 설계

In [43]:
# attention
class BahdanauAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.w_dec = tf.keras.layers.Dense(units)
        self.w_enc = tf.keras.layers.Dense(units)
        self.w_com = tf.keras.layers.Dense(1)
    
    def call(self, h_enc, h_dec):
        # h_enc shape: [batch x length x units]
        # h_dec shape: [batch x units]

        h_enc = self.w_enc(h_enc)
        h_dec = tf.expand_dims(h_dec, 1)
        h_dec = self.w_dec(h_dec)

        score = self.w_com(tf.nn.tanh(h_dec + h_enc))
        
        attn = tf.nn.softmax(score, axis=1)

        context_vec = attn * h_enc
        context_vec = tf.reduce_sum(context_vec, axis=1)

        return context_vec, attn

In [44]:
class Encoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, enc_units):
        super(Encoder, self).__init__()
        # TODO: Awesome Encoder Modules
        self.enc_units = enc_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(enc_units, return_sequences=True)

    def call(self, x):
        # TODO: Awesome Process
        out = self.embedding(x)
        out = self.gru(out)
        
        return out

In [45]:

class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, dec_units):
        super(Decoder, self).__init__()
        self.dec_units = dec_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(dec_units,
                                       return_sequences=True,
                                       return_state=True)
        self.fc = tf.keras.layers.Dense(vocab_size)

        self.attention = BahdanauAttention(self.dec_units)

    def call(self, x, h_dec, enc_out):
        context_vec, attn = self.attention(enc_out, h_dec)

        out = self.embedding(x)
        out = tf.concat([tf.expand_dims(context_vec, 1), out], axis=-1)
        
        out, h_dec = self.gru(out)
        out = tf.reshape(out, (-1, out.shape[2]))
        out = self.fc(out)

        return out, h_dec, attn

In [46]:

BATCH_SIZE     = 64
SRC_VOCAB_SIZE = len(kr_tokenizer.index_word) + 1
TGT_VOCAB_SIZE = len(en_tokenizer.index_word) + 1

units         = 1024
embedding_dim = 512

encoder = Encoder(SRC_VOCAB_SIZE, embedding_dim, units)
decoder = Decoder(TGT_VOCAB_SIZE, embedding_dim, units)

# sample input
sequence_len = 30

sample_enc = tf.random.uniform((BATCH_SIZE, sequence_len))
sample_output = encoder(sample_enc)

print ('Encoder Output:', sample_output.shape)

sample_state = tf.random.uniform((BATCH_SIZE, units))

sample_logits, h_dec, attn = decoder(tf.random.uniform((BATCH_SIZE, 1)),
                                     sample_state, sample_output)

print ('Decoder Output:', sample_logits.shape)
print ('Decoder Hidden State:', h_dec.shape)
print ('Attention:', attn.shape)

Encoder Output: (64, 30, 1024)
Decoder Output: (64, 16439)
Decoder Hidden State: (64, 1024)
Attention: (64, 30, 1)


## Step 5. 훈련하기


In [47]:
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy( # 모델이 생성한 확률 분포와 정수 인덱스 비교해 cross entropy 계산
    from_logits=True, reduction='none') # from_logits=True: softmax 거치지 않고 모델 출력 그대로 전달

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss = loss_object(real, pred)
    
    mask = tf.cast(mask, dtype=loss.dtype) # 마스킹 통해 패딩 많은 문제 해결
    loss *= mask
    
    return tf.reduce_mean(loss)

In [48]:
@tf.function # 훈련 외적 연산을 gpu에서 동작시켜 훈련 가속
def train_step(src, tgt, encoder, decoder, optimizer, dec_tok):
    bsz = src.shape[0]
    loss = 0

    with tf.GradientTape() as tape: #모든 미분 연산 기록
        enc_out = encoder(src) # context vector 생성
        h_dec = enc_out[:, -1] # t=0일때 final hidden state
        
        dec_src = tf.expand_dims([dec_tok.word_index['<start>']] * bsz, 1)
        
        # <start> 문장과 enc_out, hiddenstate를 기반으로 다음 단어 예측
        # 예측 단어와 정답간 loss 구한 뒤 t=1의 정답 단어를 다음 입력으로 사용
        for t in range(1, tgt.shape[1]):
            pred, h_dec, _ = decoder(dec_src, h_dec, enc_out)

            loss += loss_function(tgt[:, t], pred)
            dec_src = tf.expand_dims(tgt[:, t], 1)
        
    batch_loss = (loss / int(tgt.shape[1]))

    variables = encoder.trainable_variables + decoder.trainable_variables
    gradients = tape.gradient(loss, variables)
    optimizer.apply_gradients(zip(gradients, variables))
    
    return batch_loss

In [53]:
import random

EPOCHS = 100

be_total_loss = 99999
for epoch in range(EPOCHS):
    total_loss = 0
    
    idx_list = list(range(0, kr_token.shape[0], BATCH_SIZE))
    random.shuffle(idx_list)
    t = tqdm(idx_list)
    for (batch, idx) in enumerate(t):
        batch_loss = train_step(kr_token[idx:idx+BATCH_SIZE],
                                en_token[idx:idx+BATCH_SIZE],
                                encoder,
                                decoder,
                                optimizer,
                                en_tokenizer)
        
        total_loss += batch_loss

        t.set_description_str('Epoch %2d' % (epoch + 1))
        t.set_postfix_str('Loss %.4f' % (total_loss.numpy() / (batch + 1)))
    
    
    if be_total_loss < (total_loss / (batch + 1)):
        break
    else:
        be_total_loss = (total_loss / (batch + 1))

Epoch  1: 100%|██████████| 232/232 [03:17<00:00,  1.18it/s, Loss 1.6668]
Epoch  2: 100%|██████████| 232/232 [02:02<00:00,  1.89it/s, Loss 1.6229]
Epoch  3: 100%|██████████| 232/232 [02:02<00:00,  1.89it/s, Loss 1.6282]


In [54]:
# # 모델 저장
encoder.save_weights('./encoder_weights_50000_ep1')
decoder.save_weights('./decoder_weights_50000_ep1')

In [55]:
def preprocessing_eval(txt):
    sentence = txt.strip()                                         # 문장의 양쪽 공백 제거
    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)                   # 특수 문자 및 구두점 주변에 공백 추가
    sentence = re.sub(r'[" "]+', " ", sentence)                         # 여러 개의 공백을 하나의 공백으로 대체
    sentence = re.sub(r"[^ㄱ-ㅎㅏ-ㅣ가-힣a-zA-Z?.!,]+", " ", sentence)  # 한글 및 영어 이외의 문자는 공백으로 대체
    sentence = sentence.strip()                                         # 다시 양쪽 공백 제거

    return sentence

In [58]:
def evaluate(sentence, encoder, decoder):
    attention = np.zeros((en_token.shape[-1], kr_token.shape[-1]))
    
    sentence = preprocessing_eval(sentence)
    inputs = kr_tokenizer.texts_to_sequences([sentence.split()])
    inputs = tf.keras.preprocessing.sequence.pad_sequences(inputs,
                                                           maxlen=kr_token.shape[-1],
                                                           padding='post')

    result = ''

    enc_out = encoder(inputs)

    dec_hidden = enc_out[:, -1]
    dec_input = tf.expand_dims([en_tokenizer.word_index['<start>']], 0)

    for t in range(en_token.shape[-1]):
        predictions, dec_hidden, attention_weights = decoder(dec_input,
                                                             dec_hidden,
                                                             enc_out)

        attention_weights = tf.reshape(attention_weights, (-1, ))
        attention[t] = attention_weights.numpy()

        predicted_id = \
        tf.argmax(tf.math.softmax(predictions, axis=-1)[0]).numpy()

        result += en_tokenizer.index_word[predicted_id] + ' '

        if en_tokenizer.index_word[predicted_id] == '<end>':
            return result, sentence, attention

        dec_input = tf.expand_dims([predicted_id], 0)

    return result, sentence, attention

import seaborn as sns
sns.set(font='NanumGothic')

def plot_attention(attention, sentence, predicted_sentence):
    fig = plt.figure(figsize=(10,10))
    ax = fig.add_subplot(1, 1, 1)
    ax.matshow(attention, cmap='viridis')

    ax.set_xticklabels([''] + sentence, rotation=90)
    ax.set_yticklabels([''] + predicted_sentence)

    ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
    ax.yaxis.set_major_locator(ticker.MultipleLocator(1))

    plt.show()


def translate(sentence, encoder, decoder):
    result, sentence, attention = evaluate(sentence, encoder, decoder)

    print('Input: %s' % (sentence))
    print('Output: {}'.format(result))
    
    attention = attention[:len(result.split()), :len(sentence.split())]
    #plot_attention(attention, sentence.split(), result.split(' '))

'''
K1) 오바마는 대통령이다.
K2) 시민들은 도시 속에 산다.
K3) 커피는 필요 없다.
K4) 일곱 명의 사망자가 발생했다.
'''
translate("오바마는 대통령이다.", encoder, decoder)
translate("시민들은 도시 속에 산다.", encoder, decoder)
translate("커피는 필요 없다.", encoder, decoder)
translate("일곱 명의 사망자가 발생했다.", encoder, decoder)

Input: 오바마는 대통령이다 .
Output: <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> 
Input: 시민들은 도시 속에 산다 .
Output: <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <start> <sta