In [1]:
import matplotlib as mpl
import matplotlib.pyplot as plt

import tensorflow as tf
import numpy as np

from sklearn.model_selection import train_test_split

import matplotlib.ticker as ticker
import matplotlib.pyplot as plt

import time
import re
import os
import io

In [2]:
%config InlineBackend.figure_format = 'retina'
 
import matplotlib.font_manager as fm
fontpath = '/usr/share/fonts/truetype/nanum/NanumBarunGothic.ttf'
font = fm.FontProperties(fname=fontpath, size=9)
plt.rc('font', family='NanumBarunGothic') 
mpl.font_manager.findfont(font)

print("완료!")

완료!


In [3]:
enc_train_data = 'korean-english-park.train.ko'
dec_train_data = 'korean-english-park.train.en'

In [4]:
# 데이터 읽기
def load_data(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        data = file.readlines()
    return data

In [5]:
enc_data = load_data(enc_train_data)
dec_data = load_data(dec_train_data)

In [6]:
len(enc_data)

94123

In [7]:
# 특수기호 확인하는 함수
def find_special_char(data):
    # 한글과 숫자를 제외한 특수문자만 찾는 정규표현식
    pattern = r'[^가-힣0-9a-zA-Z\s]'
    
    special_chars = []
    
    # 리스트의 각 항목에 대해 특수기호를 찾음
    for text in data:
        if isinstance(text, str):  # 문자열인 경우에만 처리
            # 정규표현식을 통해 특수문자 추출
            special_chars.extend(re.findall(pattern, text))
    
    return special_chars

In [8]:
list(set(find_special_char(enc_data)))

['氷',
 '㎠',
 '地',
 '場',
 '修',
 '反',
 '平',
 'や',
 '愛',
 '董',
 '天',
 '夫',
 '管',
 '㎝',
 '會',
 '料',
 '島',
 '火',
 '赤',
 '四',
 '佛',
 'ㆍ',
 '~',
 '局',
 '戀',
 '丙',
 '淘',
 '英',
 '神',
 '故',
 '油',
 '兆',
 'か',
 '者',
 '器',
 '察',
 '輔',
 '㈜',
 '節',
 '８',
 '店',
 '鼓',
 '種',
 '進',
 '國',
 '州',
 '吳',
 '手',
 '克',
 '幹',
 '\uf0d7',
 '岩',
 '公',
 '㎞',
 '―',
 '電',
 '．',
 '輪',
 'し',
 '習',
 '{',
 '瀋',
 '?',
 '蓄',
 '敎',
 '靑',
 '“',
 '强',
 '泰',
 '難',
 '紅',
 '對',
 'ㄴ',
 '´',
 'い',
 '億',
 '送',
 '蔣',
 '藻',
 'é',
 '軍',
 '臺',
 '頂',
 '省',
 'そ',
 '金',
 '３',
 '切',
 '上',
 '±',
 '頭',
 '^',
 '`',
 '言',
 'て',
 ';',
 '銀',
 '２',
 '銅',
 '勝',
 '疫',
 '走',
 '燭',
 '室',
 '軌',
 '老',
 "'",
 '華',
 '光',
 'す',
 '邸',
 '球',
 '命',
 '照',
 '之',
 '迷',
 '麻',
 '駐',
 '霧',
 '˝',
 '輿',
 '北',
 'ま',
 '℃',
 '諡',
 '配',
 '效',
 '花',
 '濱',
 '蓋',
 '”',
 '毬',
 '!',
 '氣',
 '亞',
 ',',
 '#',
 '湖',
 '４',
 '買',
 '多',
 '峽',
 '時',
 '語',
 '毁',
 '‥',
 '中',
 '@',
 '父',
 '’',
 '男',
 '１',
 '新',
 '次',
 '李',
 '鄕',
 '屋',
 '因',
 '然',
 '路',
 '査',
 '海',
 '結',
 '懶',
 '懷',
 '奔',

In [9]:
list(set(find_special_char(dec_data)))

['½',
 '\\',
 'Î',
 '˝',
 'ç',
 'ó',
 '℃',
 '¸',
 '·',
 '”',
 'µ',
 '°',
 '!',
 ',',
 'Û',
 '#',
 '~',
 '<',
 '.',
 'â',
 '"',
 'Ñ',
 '¥',
 '%',
 '‘',
 '+',
 '@',
 '’',
 '—',
 '»',
 '–',
 'ñ',
 'Ù',
 'Á',
 '(',
 '®',
 '―',
 'ö',
 'À',
 '?',
 '“',
 '¦',
 'º',
 '¯',
 '´',
 'ø',
 'Ç',
 '=',
 '_',
 ']',
 '-',
 '>',
 '¡',
 ')',
 '¹',
 '…',
 '[',
 '¿',
 'Ï',
 '∼',
 '±',
 '^',
 '`',
 '$',
 'Ê',
 ';',
 ':',
 '*',
 '¨',
 '£',
 'æ',
 "'",
 'û',
 '&',
 '¾',
 '¢',
 '/',
 'Â']

In [10]:
def preprocess_sentence(sentence):

    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
    sentence = re.sub(r'[" "]+', " ", sentence)
    sentence = re.sub(r"[^가-힣0-9a-zA-Z?.!,]+", " ", sentence)

    sentence = sentence.strip()
    
    return sentence

In [11]:
enc_corpus = []
dec_corpus = []

num_examples = 30000

for kor in enc_data[:num_examples]:
    enc_corpus.append(preprocess_sentence(kor))
    
for eng in dec_data[:num_examples]:
    dec_corpus.append(preprocess_sentence(eng))

print("한국어:", enc_corpus[100])
print("영어:", dec_corpus[100])

한국어: 제 23차 연례 컴덱스 박람회의 개회사를 한 케이츠는 2년여전 기술 산업의 거품이 붕괴된 이후에 첨단 기술에 대해 부정적인 인식이 있다고 말했다 .
영어: Gates , who opened the 23rd annual Comdex trade show , said there was a negative perception of high tech following the collapse of the tech bubble about two years ago .


In [12]:
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import MeCab

In [13]:
mecab = MeCab.Tagger()

In [14]:
# 한국어 토크나이저
def encoder_tokenizer(encoder_texts):

    tokenized_texts = [" ".join(mecab.parse(sentence)) for sentence in encoder_texts]

    # Keras Tokenizer 생성 및 학습
    enc_tokenizer = Tokenizer(oov_token="<OOV>")
    enc_tokenizer.fit_on_texts(tokenized_texts)

    # 시퀀스로 변환
    encoder_sequences = enc_tokenizer.texts_to_sequences(tokenized_texts)

    # 패딩 적용
    encoder_input = pad_sequences(encoder_sequences, padding="post")

    return encoder_input, enc_tokenizer

In [15]:
#영어 토크나이저
def decoder_tokenizer(decoder_texts):

    START_TOKEN = "<start>"
    END_TOKEN = "<end>"

    # 문장별 토큰화 및 특수 토큰 추가
    tokenized_texts = [f"{START_TOKEN} {sentence.lower()} {END_TOKEN}" for sentence in decoder_texts]

    # Keras Tokenizer 생성 및 학습
    dec_tokenizer = Tokenizer(oov_token="<OOV>", filters='')
    dec_tokenizer.fit_on_texts(tokenized_texts)

    # 시퀀스로 변환
    decoder_sequences = dec_tokenizer.texts_to_sequences(tokenized_texts)

    # 패딩 적용
    decoder_input = pad_sequences(decoder_sequences, padding="post")

    return decoder_input, dec_tokenizer

In [16]:
# 예제 데이터
encoder_texts = ["안녕하세요 반갑습니다", "자연어 처리는 재미있어요"]
decoder_texts = ["hello nice to meet you", "nlp is fun"]

# 토크나이저 실행
encoder_input, encoder_vocab = encoder_tokenizer(encoder_texts)
decoder_input, decoder_vocab = decoder_tokenizer(decoder_texts)

print("Encoder Input Tensor:", encoder_input)
print("Decoder Input Tensor:", decoder_input)

Encoder Input Tensor: [[15 16  2  2  6 17 18  7 15 16 19 20 11 12  3 19 21  8  4  9  4  3  3 21
   8 35  2  3 36  4 22  7  4  9  4  3 37  4  9  5  8  4  3 23 24 12 25  7
  23 24 26 27 28  4  3  3 26 27 28  4 10 11]
 [13 14  5  2  2  6  3 13 14  5 22 10 38  9 10 39  2 40 13 14  2  2  6  5
   2  2  6 29 30  2  2  6 17 18  3 29 30 31 41 20  7 31 32 33 34 12 25  7
  32 33 34  5  8  4  3  3  5  8  4 10 11  0]]
Decoder Input Tensor: [[ 2  4  5  6  7  8  3]
 [ 2  9 10 11  3  0  0]]


In [17]:
enc_corpus, enc_tokenizer = encoder_tokenizer(enc_corpus)
dec_corpus, dec_tokenizer = decoder_tokenizer(dec_corpus)

In [18]:
enc_tokenizer

<keras_preprocessing.text.Tokenizer at 0x7d85e73ae4c0>

In [19]:
enc_corpus[0]

array([145,  27,   2, ...,   0,   0,   0], dtype=int32)

In [20]:
dec_tokenizer.index_word[6]

'<end>'

In [21]:
dec_corpus[0]

array([   5,  217,    8, 1231, 5860,   17,   43,   90,   73,  253,   47,
        341,    6,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0,    0,    0,    0,    0,    0,    0,
          0,    0,    0,    0,    0], dtype=int32)

In [22]:
enc_train, enc_val, dec_train, dec_val = train_test_split(enc_corpus, dec_corpus, test_size=0.2)

In [23]:
class BahdanauAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.w_dec = tf.keras.layers.Dense(units)
        self.w_enc = tf.keras.layers.Dense(units)
        self.w_com = tf.keras.layers.Dense(1)
    
    def call(self, h_enc, h_dec):
        # h_enc shape: [batch x length x units]
        # h_dec shape: [batch x units]

        h_enc = self.w_enc(h_enc)
        h_dec = tf.expand_dims(h_dec, 1)
        h_dec = self.w_dec(h_dec)

        score = self.w_com(tf.nn.tanh(h_dec + h_enc))
        
        attn = tf.nn.softmax(score, axis=1)

        context_vec = attn * h_enc
        context_vec = tf.reduce_sum(context_vec, axis=1)

        return context_vec, attn

In [24]:
class Encoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, enc_units):
        super(Encoder, self).__init__()
        
        self.enc_units = enc_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(enc_units,
                                       return_sequences=True)
        
    def call(self, x):
        out = self.embedding(x)
        out = self.gru(out)
        
        return out

In [25]:
class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, dec_units):
        super(Decoder, self).__init__()
        self.dec_units = dec_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(dec_units,
                                       return_sequences=True,
                                       return_state=True)
        self.fc = tf.keras.layers.Dense(vocab_size)

        self.attention = BahdanauAttention(self.dec_units)

    def call(self, x, h_dec, enc_out):
        context_vec, attn = self.attention(enc_out, h_dec)

        out = self.embedding(x)
        out = tf.concat([tf.expand_dims(context_vec, 1), out], axis=-1)
        
        out, h_dec = self.gru(out)
        out = tf.reshape(out, (-1, out.shape[2]))
        out = self.fc(out)

        return out, h_dec, attn

In [26]:
BATCH_SIZE     = 16
SRC_VOCAB_SIZE = len(enc_tokenizer.index_word) + 1
TGT_VOCAB_SIZE = len(dec_tokenizer.index_word) + 1

units         = 512
embedding_dim = 256

encoder = Encoder(SRC_VOCAB_SIZE, embedding_dim, units)
decoder = Decoder(TGT_VOCAB_SIZE, embedding_dim, units)

In [27]:
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss = loss_object(real, pred)
    
    mask = tf.cast(mask, dtype=loss.dtype)
    loss *= mask
    
    return tf.reduce_mean(loss)

In [28]:
@tf.function
def train_step(src, tgt, encoder, decoder, optimizer, dec_tok):
    bsz = src.shape[0]
    loss = 0

    with tf.GradientTape() as tape:
        enc_out = encoder(src)
        h_dec = enc_out[:, -1]
        
        dec_src = tf.expand_dims([dec_tok.word_index['<start>']] * bsz, 1)

        for t in range(1, tgt.shape[1]):
            pred, h_dec, _ = decoder(dec_src, h_dec, enc_out)

            loss += loss_function(tgt[:, t], pred)
            dec_src = tf.expand_dims(tgt[:, t], 1)
        
    batch_loss = (loss / int(tgt.shape[1]))

    variables = encoder.trainable_variables + decoder.trainable_variables
    gradients = tape.gradient(loss, variables)
    optimizer.apply_gradients(zip(gradients, variables))
    
    return batch_loss

In [29]:
from tqdm import tqdm
import random

In [30]:
@tf.function
def eval_step(src, tgt, encoder, decoder, dec_tok):
    bsz = src.shape[0]
    loss = 0

    enc_out = encoder(src)

    h_dec = enc_out[:, -1]
    
    dec_src = tf.expand_dims([dec_tok.word_index['<start>']] * bsz, 1)

    for t in range(1, tgt.shape[1]):
        pred, h_dec, _ = decoder(dec_src, h_dec, enc_out)

        loss += loss_function(tgt[:, t], pred)
        dec_src = tf.expand_dims(tgt[:, t], 1)
        
    batch_loss = (loss / int(tgt.shape[1]))
    
    return batch_loss

In [None]:
EPOCHS = 5

for epoch in range(EPOCHS):
    total_loss = 0
    
    idx_list = list(range(0, enc_train.shape[0], BATCH_SIZE))
    random.shuffle(idx_list)
    t = tqdm(idx_list)

    for (batch, idx) in enumerate(t):
        batch_loss = train_step(enc_train[idx:idx+BATCH_SIZE],
                                dec_train[idx:idx+BATCH_SIZE],
                                encoder,
                                decoder,
                                optimizer,
                                dec_tokenizer)
    
        total_loss += batch_loss
        
        t.set_description_str('Epoch %2d' % (epoch + 1))
        t.set_postfix_str('Loss %.4f' % (total_loss.numpy() / (batch + 1)))
    
    test_loss = 0
    
    idx_list = list(range(0, enc_val.shape[0], BATCH_SIZE))
    random.shuffle(idx_list)
    t = tqdm(idx_list)

    for (test_batch, idx) in enumerate(t):
        test_batch_loss = eval_step(enc_val[idx:idx+BATCH_SIZE],
                                    dec_val[idx:idx+BATCH_SIZE],
                                    encoder,
                                    decoder,
                                    dec_tokenizer)
    
        test_loss += test_batch_loss

        t.set_description_str('Test Epoch %2d' % (epoch + 1))
        t.set_postfix_str('Test Loss %.4f' % (test_loss.numpy() / (test_batch + 1)))

Epoch  1: 100%|██████████| 1500/1500 [39:48<00:00,  1.59s/it, Loss 2.1376] 
Test Epoch  1: 100%|██████████| 375/375 [03:31<00:00,  1.77it/s, Test Loss 2.1375]
Epoch  2: 100%|██████████| 1500/1500 [37:24<00:00,  1.50s/it, Loss 2.1147]
Test Epoch  2: 100%|██████████| 375/375 [03:00<00:00,  2.07it/s, Test Loss 2.1514]
Epoch  3: 100%|██████████| 1500/1500 [37:25<00:00,  1.50s/it, Loss 2.1152]
Test Epoch  3: 100%|██████████| 375/375 [03:01<00:00,  2.07it/s, Test Loss 2.1629]
Epoch  4: 100%|██████████| 1500/1500 [37:24<00:00,  1.50s/it, Loss 2.1152]
Test Epoch  4: 100%|██████████| 375/375 [03:01<00:00,  2.07it/s, Test Loss 2.1568]
Epoch  5: 100%|██████████| 1500/1500 [37:23<00:00,  1.50s/it, Loss 2.1151]
Test Epoch  5:  65%|██████▌   | 244/375 [01:58<01:03,  2.06it/s, Test Loss 2.1747]

In [None]:
def evaluate(sentence, encoder, decoder):
    attention = np.zeros((dec_train.shape[-1], enc_train.shape[-1]))
    
    sentence = preprocess_sentence(sentence)
    inputs = enc_tokenizer.texts_to_sequences([sentence.split()])
    inputs = tf.keras.preprocessing.sequence.pad_sequences(inputs,
                                                           maxlen=enc_train.shape[-1],
                                                           padding='post')

    result = ''

    enc_out = encoder(inputs)

    dec_hidden = enc_out[:, -1]
    dec_input = tf.expand_dims([dec_tokenizer.word_index['<start>']], 0)

    for t in range(dec_train.shape[-1]):
        predictions, dec_hidden, attention_weights = decoder(dec_input,
                                                             dec_hidden,
                                                             enc_out)

        attention_weights = tf.reshape(attention_weights, (-1, ))
        attention[t] = attention_weights.numpy()

        predicted_id = \
        tf.argmax(tf.math.softmax(predictions, axis=-1)[0]).numpy()

        result += dec_tokenizer.index_word[predicted_id] + ' '

        if dec_tokenizer.index_word[predicted_id] == '<end>':
            return result, sentence, attention

        dec_input = tf.expand_dims([predicted_id], 0)

    return result, sentence, attention

In [None]:
def plot_attention(attention, sentence, predicted_sentence):
    fig = plt.figure(figsize=(10,10))
    ax = fig.add_subplot(1, 1, 1)
    ax.matshow(attention, cmap='viridis')

    fontdict = {'fontsize': 14}

    ax.set_xticklabels([''] + sentence, fontdict=fontdict, rotation=90)
    ax.set_yticklabels([''] + predicted_sentence, fontdict=fontdict)

    ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
    ax.yaxis.set_major_locator(ticker.MultipleLocator(1))

    plt.show()


def translate(sentence, encoder, decoder):
    result, sentence, attention = evaluate(sentence, encoder, decoder)

    print('Input: %s' % (sentence))
    print('Predicted translation: {}'.format(result))
    
    attention = attention[:len(result.split()), :len(sentence.split())]
    plot_attention(attention, sentence.split(), result.split(' '))

In [None]:
translate("일곱 명의 사망자가 발생했다.", encoder, decoder)

In [None]:
### 이슈사항 정리
- 1. 번역 결과는 '>>>>>>>'과 같은 형태로 나옴

- 2. 어텐션 스코어가 대부분 동일한 값을 가지고 있음
[0.03348084]
[0.03345907]
[0.03342958]
[0.03339833]
[0.03337143]
[0.03335103]
[0.03333669]
[0.03332708]
[0.03332083]
[0.03331683]
[0.03331429]
[0.03331268]
[0.03331165]
[0.03331099]
[0.03331057]
[0.0333103 ]
[0.03331012]...
- 3. 토크나이저 내부 단어집합을 살펴보니 특수토큰이 별도로 지정되지 않고 분리되어 있음
[(1, '<'),
(2, '>'),
(3, 'the'),
(4, ','),
(5, '.'),
(6, 'end'),
(7, 'start'),
 ...
 
 - 취합하면 토크나이징 방법에 문제가 있을 수 있고 모델 구조도 함께 살펴볼 예정