In [1]:
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import tensorflow as tf
import numpy as np
from sklearn.model_selection import train_test_split
import time
import re
import os
import io

%config InlineBackend.figure_format = 'retina'
 
import matplotlib.font_manager as fm
fontpath = '/usr/share/fonts/truetype/nanum/NanumBarunGothic.ttf'
font = fm.FontProperties(fname=fontpath, size=9)
plt.rc('font', family='NanumBarunGothic') 
mpl.font_manager.findfont(font)

print("완료!")

완료!


## 1. 데이터 다운로드

In [10]:
# 파일 다운로드
path_to_zip = tf.keras.utils.get_file(
    'korean-english-park.train.tar.gz',  # 저장될 파일명
    origin='https://github.com/jungyeul/korean-parallel-corpora/raw/master/korean-english-news-v1/korean-english-park.train.tar.gz',
    extract=True  # 압축 해제 옵션
)

# 압축이 해제된 디렉토리 경로 확인
extracted_dir = os.path.dirname(path_to_zip)

# 압축 파일 목록 확인
with tarfile.open(path_to_zip, "r:gz") as tar:
    tar_list = tar.getnames()  # tar 파일 내 파일 목록을 가져옴
    print("압축 파일 목록:", tar_list)

압축 파일 목록: ['korean-english-park.train.en', 'korean-english-park.train.ko']


In [11]:
# tar 파일 내의 파일 경로 확인 후, 각 파일에 대한 경로 설정
path_to_file_en = os.path.join(extracted_dir, "korean-english-park.train.en")
path_to_file_ko = os.path.join(extracted_dir, "korean-english-park.train.ko")

# 각각의 파일을 읽음
with open(path_to_file_en, "r", encoding='utf-8') as f_en, open(path_to_file_ko, "r", encoding='utf-8') as f_ko:
    raw_data_en = f_en.read().splitlines()
    raw_data_ko = f_ko.read().splitlines()

# 데이터 크기 및 예시 출력
print("English Data Size:", len(raw_data_en))
print("Korean Data Size:", len(raw_data_ko))

# 데이터 샘플 출력 (100개 중 20개씩 출력)
print("English Example:")
for sen in raw_data_en[0:100][::20]:
    print(">>", sen)

print("Korean Example:")
for sen in raw_data_ko[0:100][::20]:
    print(">>", sen)

English Data Size: 94123
Korean Data Size: 94123
English Example:
>> Much of personal computing is about "can you top this?"
>> Amid mounting pressure on North Korea to abandon its nuclear weapons program Japanese and North Korean diplomats have resumed talks on normalizing diplomatic relations.
>> “Guard robots are used privately and professionally to detect intruders or fire,” Karlsson said.
>> Authorities from the Water Resources Ministry plan to begin construction next year on the controversial and hugely expensive project.
>> Researchers also have debated whether weight-training has a big impact on the heart, since it does not give the heart and lungs the kind of workout they get from aerobic activities such as brisk walking or running for at least 20 minutes.
Korean Example:
>> 개인용 컴퓨터 사용의 상당 부분은 "이것보다 뛰어날 수 있느냐?"
>> 북한의 핵무기 계획을 포기하도록 하려는 압력이 거세지고 있는 가운데, 일본과 북한의 외교관들이 외교 관계를 정상화하려는 회담을 재개했다.
>> "경호 로보트가 침입자나 화재를 탐지하기 위해서 개인적으로, 그리고 전문적으로 사용되고 있습니다."
>> 수자원부 당국은 논란이 되고 있고, 막대한 비용

In [12]:
# 데이터를 enc_corpus (영어)와 dec_corpus (한국어)에 저장
enc_corpus = raw_data_en
dec_corpus = raw_data_ko

print("Example English Sentence:", enc_corpus[0])
print("Example Korean Sentence:", dec_corpus[0])

Example English Sentence: Much of personal computing is about "can you top this?"
Example Korean Sentence: 개인용 컴퓨터 사용의 상당 부분은 "이것보다 뛰어날 수 있느냐?"


## 2. 데이터 정제

In [13]:
from konlpy.tag import Mecab

# Mecab 초기화
mecab = Mecab()

# 1. 중복된 데이터 제거
# 영어-한국어 병렬 데이터를 중복을 제거한 상태로 저장
cleaned_corpus = list(set(zip(raw_data_en, raw_data_ko)))

In [14]:
# 2. 정규식 기반 한글/영어 전처리 함수 정의
def preprocess_sentence(sentence, lang="en"):
    if lang == "en":
        # 영문 전처리 (소문자 변환, 특수문자 제거)
        sentence = sentence.lower().strip()
        sentence = re.sub(r"([?.!,¿])", r" \1 ", sentence)
        sentence = re.sub(r"[^a-zA-Z?.!,¿]+", r" ", sentence)
        sentence = re.sub(r"\s+", " ", sentence)
    else:
        # 한글 전처리 (특수문자 제거)
        sentence = re.sub(r"[^가-힣0-9\s]+", "", sentence)  # 한글, 숫자, 공백 외 제거
        sentence = re.sub(r"\s+", " ", sentence).strip()    # 중복 공백 제거

    return sentence

In [15]:
# 3. 영어에 <start>와 <end> 토큰 추가, 한글은 mecab 토큰화
eng_corpus = []
kor_corpus = []

for eng_sentence, kor_sentence in cleaned_corpus:
    # 영어 문장 전처리 + <start>, <end> 토큰 추가
    eng_sentence = preprocess_sentence(eng_sentence, lang="en")
    eng_sentence = "<start> " + eng_sentence + " <end>"
    eng_corpus.append(eng_sentence.split())  # 토큰화하여 저장

    # 한국어 문장 전처리 + Mecab 토큰화
    kor_sentence = preprocess_sentence(kor_sentence, lang="ko")
    kor_corpus.append(mecab.morphs(kor_sentence))  # Mecab을 사용한 토큰화

In [16]:
# 4. 길이가 40 이하인 데이터만 선별
final_eng_corpus = []
final_kor_corpus = []

for eng_tokens, kor_tokens in zip(eng_corpus, kor_corpus):
    if len(eng_tokens) <= 40 and len(kor_tokens) <= 40:  # 토큰 길이가 40 이하인 경우
        final_eng_corpus.append(eng_tokens)
        final_kor_corpus.append(kor_tokens)

# 데이터 확인
print("총 데이터 샘플 수 (중복 제거 후):", len(cleaned_corpus))
print("최종 영어 코퍼스 크기:", len(final_eng_corpus))
print("최종 한국어 코퍼스 크기:", len(final_kor_corpus))

총 데이터 샘플 수 (중복 제거 후): 78968
최종 영어 코퍼스 크기: 63863
최종 한국어 코퍼스 크기: 63863


In [17]:
# 샘플 출력
print("Example English Sentence:", final_eng_corpus[0])
print("Example Korean Sentence:", final_kor_corpus[0])

Example English Sentence: ['<start>', 'barcelona', 'vice', 'president', 'ferran', 'soriano', 'told', 'radio', 'marca', '<end>']
Example Korean Sentence: ['앙리', '29', '는', '바르셀로나', '구단', '과', '4', '년', '간', '약', '299', '억', '원', '에', '계약', '하', '게', '된다']


## 3. 데이터 토큰화

In [18]:
def tokenize(corpus, vocab_size):
    tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=vocab_size, filters='', oov_token='<OOV>')
    tokenizer.fit_on_texts(corpus)

    tensor = tokenizer.texts_to_sequences(corpus)
    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='post')

    return tensor, tokenizer

# 단어의 수
vocab_size_en = 10000
vocab_size_ko = 10000

In [28]:
# 영어와 한국어 코퍼스를 각각 tokenize 함수에 적용
eng_tensor, enc_tokenizer = tokenize(final_eng_corpus, vocab_size_en)
kor_tensor, dec_tokenizer = tokenize(final_kor_corpus, vocab_size_ko)

# 결과 확인
print("영어 텐서의 크기:", eng_tensor.shape)
print("한국어 텐서의 크기:", kor_tensor.shape)

영어 텐서의 크기: (63863, 40)
한국어 텐서의 크기: (63863, 40)


In [29]:
# 토큰화된 첫 번째 예시 출력
print("Example English Tensor:", eng_tensor[0])
print("Example Korean Tensor:", kor_tensor[0])

Example English Tensor: [   4 3027  813   54    1    1   84  999    1    5    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0]
Example Korean Tensor: [5415  621    4 2848 4817   22   94   36   97  133    1  163  226    8
 1054   11   42  167    0    0    0    0    0    0    0    0    0    0
    0    0    0    0    0    0    0    0    0    0    0    0]


In [30]:
print("영어 tokenizer 단어 인덱스 개수:", len(eng_tokenizer.word_index))
print("한국어 tokenizer 단어 인덱스 개수:", len(kor_tokenizer.word_index))

영어 tokenizer 단어 인덱스 개수: 38831
한국어 tokenizer 단어 인덱스 개수: 39699


## 4. 모델 설계

In [31]:
class BahdanauAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.w_dec = tf.keras.layers.Dense(units)
        self.w_enc = tf.keras.layers.Dense(units)
        self.w_com = tf.keras.layers.Dense(1)
    
    def call(self, h_enc, h_dec):
        # h_enc shape: [batch x length x units]
        # h_dec shape: [batch x units]

        h_enc = self.w_enc(h_enc)
        h_dec = tf.expand_dims(h_dec, 1)
        h_dec = self.w_dec(h_dec)

        score = self.w_com(tf.nn.tanh(h_dec + h_enc))
        
        attn = tf.nn.softmax(score, axis=1)

        context_vec = attn * h_enc
        context_vec = tf.reduce_sum(context_vec, axis=1)

        return context_vec, attn

In [35]:
class Encoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, enc_units, dropout_rate=0.3):
        super(Encoder, self).__init__()
        
        self.enc_units = enc_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(enc_units,
                                       return_sequences=True,
                                       return_state=True)
        self.dropout = tf.keras.layers.Dropout(dropout_rate)  # Dropout 추가
        
    def call(self, x):
        out = self.embedding(x)
        out = self.dropout(out)  # Dropout 적용
        out, state = self.gru(out)
        
        return out, state

In [44]:
class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, dec_units, dropout_rate=0.3):
        super(Decoder, self).__init__()
        self.dec_units = dec_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(dec_units,
                                       return_sequences=True,
                                       return_state=True)
        self.fc = tf.keras.layers.Dense(vocab_size)

        self.attention = BahdanauAttention(self.dec_units)
        self.dropout = tf.keras.layers.Dropout(dropout_rate)

    def call(self, x, h_dec, enc_out):
        context_vec, attn = self.attention(enc_out, h_dec)

        # context_vec 차원을 반복해서 seq_len과 맞추기
        seq_len = tf.shape(x)[1]  # 입력 시퀀스 길이
        context_vec = tf.expand_dims(context_vec, 1)  # [batch_size, 1, units]
        context_vec = tf.repeat(context_vec, seq_len, axis=1)  # [batch_size, seq_len, units]

        out = self.embedding(x)
        out = tf.concat([context_vec, out], axis=-1)  # 차원 맞춘 후 concat
        
        out = self.dropout(out)  # Dropout 적용
        out, h_dec = self.gru(out)
        out = tf.reshape(out, (-1, out.shape[2]))
        out = self.fc(out)

        return out, h_dec, attn

In [45]:
BATCH_SIZE     = 64
SRC_VOCAB_SIZE = len(enc_tokenizer.index_word) + 1
TGT_VOCAB_SIZE = len(dec_tokenizer.index_word) + 1

units         = 1024
embedding_dim = 512
dropout_rate  = 0.3

encoder = Encoder(SRC_VOCAB_SIZE, embedding_dim, units)
decoder = Decoder(TGT_VOCAB_SIZE, embedding_dim, units)

# sample input
sequence_len = 30
sample_enc = tf.random.uniform((BATCH_SIZE, sequence_len))

# 인코더에서 두 가지 출력 (output과 state)을 받아줌
sample_output, sample_state = encoder(sample_enc)

# Encoder 출력 확인
print('Encoder Output:', sample_output.shape)  # 첫 번째 반환값
print('Encoder Hidden State:', sample_state.shape)  # 두 번째 반환값

sample_state = tf.random.uniform((BATCH_SIZE, units))  # decoder의 state 초기화
sample_logits, h_dec, attn = decoder(tf.random.uniform((BATCH_SIZE, 1)),
                                     sample_state, sample_output)

# Decoder 출력 확인
print('Decoder Output:', sample_logits.shape)
print('Decoder Hidden State:', h_dec.shape)
print('Attention:', attn.shape)

Encoder Output: (64, 30, 1024)
Encoder Hidden State: (64, 1024)
Decoder Output: (64, 39700)
Decoder Hidden State: (64, 1024)
Attention: (64, 30, 1)


## 5. 훈련하기

In [49]:
from tqdm import tqdm    # tqdm
import random

enc_train = eng_tensor 
dec_train = kor_tensor 
# Optimizer
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0001)#학습률조정
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss = loss_object(real, pred)
    
    mask = tf.cast(mask, dtype=loss.dtype)
    loss *= mask
    
    return tf.reduce_mean(loss)

In [51]:
# 훈련 과정 정의 (EPOCHS)
EPOCHS = 5

# train_step 함수 정의 (필수)
@tf.function
def train_step(enc_inp, dec_inp, encoder, decoder, optimizer, dec_tokenizer):
    loss = 0

    with tf.GradientTape() as tape:
        # Encoder forward pass
        enc_out, enc_hidden = encoder(enc_inp)

        # Decoder 초기 상태는 인코더의 최종 hidden state로 시작
        dec_hidden = enc_hidden

        # <start> 토큰을 Decoder의 첫 입력으로
        dec_input = dec_inp[:, :-1]  # 마지막 <end> 토큰 제외
        dec_target = dec_inp[:, 1:]  # 첫 <start> 토큰 제외

        # 디코더를 통해 결과 예측
        predictions, _, _ = decoder(dec_input, dec_hidden, enc_out)

        # 손실 계산
        # 패딩을 무시하도록 손실 계산
        mask = tf.math.not_equal(dec_target, 0)  # 패딩인 부분은 False, 나머지는 True
        loss = tf.reduce_mean(tf.keras.losses.sparse_categorical_crossentropy(dec_target, predictions, from_logits=True))

    # 그라디언트 계산 및 적용
    variables = encoder.trainable_variables + decoder.trainable_variables
    gradients = tape.gradient(loss, variables)
    optimizer.apply_gradients(zip(gradients, variables))

    return loss

In [52]:
for epoch in range(EPOCHS):
    total_loss = 0
    
    idx_list = list(range(0, enc_train.shape[0], BATCH_SIZE))
    random.shuffle(idx_list)
    t = tqdm(idx_list)    # tqdm

    for (batch, idx) in enumerate(t):
        batch_loss = train_step(enc_train[idx:idx+BATCH_SIZE],
                                dec_train[idx:idx+BATCH_SIZE],
                                encoder,
                                decoder,
                                optimizer,
                                dec_tokenizer)
    
        total_loss += batch_loss
        
        t.set_description_str('Epoch %2d' % (epoch + 1))    # tqdm
        t.set_postfix_str('Loss %.4f' % (total_loss.numpy() / (batch + 1)))    # tqdm

Epoch  1: 100%|██████████| 998/998 [04:10<00:00,  3.99it/s, Loss 4.4887]
Epoch  2: 100%|██████████| 998/998 [04:07<00:00,  4.03it/s, Loss 4.4731]
Epoch  3: 100%|██████████| 998/998 [04:08<00:00,  4.02it/s, Loss 4.4732]
Epoch  4: 100%|██████████| 998/998 [04:09<00:00,  4.01it/s, Loss 4.4732]
Epoch  5: 100%|██████████| 998/998 [04:08<00:00,  4.02it/s, Loss 4.4732]


In [56]:
def evaluate(sentence, encoder, decoder):
    attention = np.zeros((dec_train.shape[-1], enc_train.shape[-1]))
    
    sentence = preprocess_sentence(sentence)
    inputs = enc_tokenizer.texts_to_sequences([sentence.split()])
    inputs = tf.keras.preprocessing.sequence.pad_sequences(inputs,
                                                           maxlen=enc_train.shape[-1],
                                                           padding='post')

    result = ''

    enc_out = encoder(inputs)

    dec_hidden = enc_out[:, -1]
    dec_input = tf.expand_dims([dec_tokenizer.word_index['<start>']], 0)

    for t in range(dec_train.shape[-1]):
        predictions, dec_hidden, attention_weights = decoder(dec_input,
                                                             dec_hidden,
                                                             enc_out)

        attention_weights = tf.reshape(attention_weights, (-1, ))
        attention[t] = attention_weights.numpy()

        predicted_id = \
        tf.argmax(tf.math.softmax(predictions, axis=-1)[0]).numpy()

        result += dec_tokenizer.index_word[predicted_id] + ' '

        if dec_tokenizer.index_word[predicted_id] == '<end>':
            return result, sentence, attention

        dec_input = tf.expand_dims([predicted_id], 0)

    return result, sentence, attention

In [57]:
def plot_attention(attention, sentence, predicted_sentence):
    fig = plt.figure(figsize=(10,10))
    ax = fig.add_subplot(1, 1, 1)
    ax.matshow(attention, cmap='viridis')

    fontdict = {'fontsize': 14}

    ax.set_xticklabels([''] + sentence, fontdict=fontdict, rotation=90)
    ax.set_yticklabels([''] + predicted_sentence, fontdict=fontdict)

    ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
    ax.yaxis.set_major_locator(ticker.MultipleLocator(1))

    plt.show()

## 회고

1. 번역기 모델 처음 만들어보는데 양쪽 언어 다 전처리하는게 좀 귀찮음
2. loss값 너무 높게 나와서 깜짝 놀람. 이유가 뭘까
   (1)단어 수가 너무 많아서? -> 6만개에서 한 절반만 써볼까?
   (2)learning rate를 좀 더 낮춰볼까?(adam 기본 0.001이니 0.0001로) (3)정답 데이터에 패딩이 포함되어 있으면 손실이 많아지니까 패딩 무시해볼까?
3. (2),(3) 적용했더니 4.6에서 4.4로 아주 조금 낮아짐