- 데이터 전처리
- 모델 설계
    - Bahdanau Attention을 사용
- 모델 훈련
    - Optimizer & Loss -Encoder-Decoder 구조 정의
    - train_step 구현
    - 학습 진행 및 시각화

In [None]:
# 한국어 말뭉치 사용을 위한 한국어를 지원하는 폰트로 변경
import matplotlib as mpl
import matplotlib.pyplot as plt
 
%config InlineBackend.figure_format = 'retina'
 
import matplotlib.font_manager as fm
fontpath = '/usr/share/fonts/truetype/nanum/NanumBarunGothic.ttf'
font = fm.FontProperties(fname=fontpath, size=9)
plt.rc('font', family='NanumBarunGothic') 
mpl.font_manager.findfont(font)

### 데이터 전처리
#### 데이터 준비하기

In [None]:
import tensorflow as tf
import numpy as np

from sklearn.model_selection import train_test_split

import matplotlib.ticker as ticker
import matplotlib.pyplot as plt

import time
import re
import os
import io

print(tf.__version__)

데이터 다운로드에 tf.keras.utils.get_file() 함수 사용
- get_file()함수는 URL로부터 데이터를 다운받고, 압축된 형식일 경우 해제해줌  


스페인어-영어 언어 데이터 세트가 포함 된 ZIP 파일을 다운로드하고 압축을 해제 한 다음, 추출 된 내용에서 특정 텍스트 파일('spa.txt')의 경로를 지정

In [None]:
path_to_zip = tf.keras.utils.get_file(
    'spa-eng.zip', # 다운로드 한 zip 파일에 지정할 이름
    origin='http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip', #  파일이 다운로드 될 UR
    extract=True) # 압축을 해제

path_to_file = os.path.dirname(path_to_zip)+"/spa-eng/spa.txt"

In [None]:
# 데이터 확인
with open(path_to_file, "r") as f:
    raw = f.read().splitlines()

print("Data Size:", len(raw))
print("Example:")

for sen in raw[0:100][::20]: print(">>", sen)

데이터는 \t 기호를 기준으로 영어와 스페인어가 병렬 쌍을 이루고 있음 


#### 데이터 전처리: 정제하기
- !와 같은 불필요한 기호등을 삭제함 
- Decoder는 첫 입력으로 사용할 시작 토큰과 문장생성 종료를 알리는 끝 토큰이 반드시 필요하기 때문에 <start>, <end>를 붙여줌

In [None]:
def preprocess_sentence(sentence, s_token=False, e_token=False):
    sentence = sentence.lower().strip() # 소문자 변환, 양쪽 공백 제거

    sentence = re.sub(r"([?.!,])", r" \1 ", sentence) # 문장 내의 구두점 앞뒤에 공백을 추가
    sentence = re.sub(r'[" "]+', " ", sentence) # 연속된 여러 개의 공백을 하나의 공백으로 축소
    sentence = re.sub(r"[^a-zA-Z?.!,]+", " ", sentence) #  영문자와 구두점(. ? ! ,) 이외의 모든 문자를 공백으로 대체

    sentence = sentence.strip()

    if s_token: # true
        sentence = '<start> ' + sentence # 문장 시작 부분에 <start> 토큰을 추가

    if e_token: # true
        sentence += ' <end>' # 문장 끝 부분에 <end> 토큰을 추가
    
    return sentence #전처리된 문장을 반환 

In [None]:
# 영어와 스페인어의 전처리된 문장 코퍼스(corpus)를 만드는 과정
enc_corpus = [] # 영어
dec_corpus = [] # 스페인어

num_examples = 30000 # 데이터는 상위 3만 개만 사용

for pair in raw[:num_examples]:
    eng, spa = pair.split("\t") # 각 문장 쌍을 탭('\t')을 기준으로 분리

    enc_corpus.append(preprocess_sentence(eng)) # 영어 전처리
    dec_corpus.append(preprocess_sentence(spa, s_token=True, e_token=True)) # 스페인어 전처리, 문장 시작과 끝에 <start>, <end> 토큰 붙임

# 100번째 문장 출력해서 확인
print("English:", enc_corpus[100])   # go away !
print("Spanish:", dec_corpus[100])   # <start> salga de aqu ! <end>

#### 데이터 전처리: 토큰화
- 정제된 텍스트를 tokenize() 함수를 사용해 토큰화하고 텐서로 변환
- 변환된 텐서를 80%의 훈련 데이터와 20%의 검증 데이터로 분리

In [None]:
def tokenize(corpus):
    tokenizer = tf.keras.preprocessing.text.Tokenizer(filters='')
    tokenizer.fit_on_texts(corpus)

    tensor = tokenizer.texts_to_sequences(corpus) # 텍스트를 정수 시퀀스로 변환

    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='post') # 패딩 적용

    return tensor, tokenizer # 토큰화된 시퀀스와 학습된 토크나이저를 반환

In [None]:
from sklearn.model_selection import train_test_split

# 훈련 데이터와 검증 데이터로 분리
enc_tensor, enc_tokenizer = tokenize(enc_corpus)
dec_tensor, dec_tokenizer = tokenize(dec_corpus)

enc_train_tensor, enc_val_tensor, dec_train_tensor, dec_val_tensor = train_test_split(enc_tensor, dec_tensor, test_size=0.2, random_state=42)

# 확인을 위한 출력
print("훈련 데이터의 크기:", len(enc_train_tensor))
print("검증 데이터의 크기:", len(enc_val_tensor))

### 모델 설계
- Encoder는 모든 Time-Step의 Hidden State를 출력으로 갖고, Decoder는 Encoder의 출력과 Decoder의 t-1 Step의 Hidden State로 Attention을 취하여 t Step의 Hidden State를 만들어 냄
- Decoder에서 t Step의 단어로 예측된 것을 실제 정답과 대조해 Loss를 구하고, 생성된 t Step의 Hidden State는 t+1 Step의 Hidden State를 만들기 위해 다시 Decoder에 전달함
- Bahdanau Attention : 입력 시퀀스의 각 위치마다 가중치를 학습하여 중요도를 결정

In [None]:
class BahdanauAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.w_dec = tf.keras.layers.Dense(units)# 디코더의 hidden state를 처리
        self.w_enc = tf.keras.layers.Dense(units)# 인코더의 hidden state를 처리
        self.w_com = tf.keras.layers.Dense(1) # 어텐션 가중치를 결합하는 데 사용되는 밀집 레이어
    
    def call(self, h_enc, h_dec):
        # h_enc shape: [batch x length x units]
        # h_dec shape: [batch x units]

        h_enc = self.w_enc(h_enc) # 인코더 hidden state에 대한 가중치를 계산
        h_dec = tf.expand_dims(h_dec, 1) # 디코더 hidden state에 차원을 추가하여 broadcasting이 가능하도록 함
        h_dec = self.w_dec(h_dec) # 디코더 hidden state에 대한 가중치를 계산

        score = self.w_com(tf.nn.tanh(h_dec + h_enc)) # 어텐션 스코어를 계산
        #  디코더 hidden state와 인코더 hidden state 간의 유사도를 나타냄
        
        attn = tf.nn.softmax(score, axis=1) # 각 인코더의 hidden state에 대한 가중치가 얻어냄
  
        context_vec = attn * h_enc # 어텐션 가중치를 사용하여 인코더 hidden state를 가중 평균
        context_vec = tf.reduce_sum(context_vec, axis=1) # 시퀀스 길이에 따라 가중 평균을 합산하여 컨텍스트 벡터를 계산

        return context_vec, attn # 계산된 컨텍스트 벡터와 어텐션 가중치를 반환

In [None]:
class Encoder(tf.keras.Model):
    # vocab_size: 어휘 사전의 크기, 즉 모델이 다루는 고유 단어의 수
    # embedding_dim: 임베딩 차원, 즉 각 단어를 나타내는 벡터의 크기
    def __init__(self, vocab_size, embedding_dim, enc_units):
        super(Encoder, self).__init__()
        
        self.enc_units = enc_units # enc_units: GRU 레이어의 유닛 수, 즉 GRU 레이어의 출력 차원
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim) # 주어진 단어 인덱스를 임베딩 벡터로 변환
        self.gru = tf.keras.layers.GRU(enc_units,
                                       return_sequences=True) # 모든 시점의 출력을 반환하도록 설정
        
    def call(self, x):
        out = self.embedding(x)
        out = self.gru(out)
        
        return out

In [None]:
class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, dec_units):
        super(Decoder, self).__init__()
        self.dec_units = dec_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(dec_units,
                                       return_sequences=True,
                                       return_state=True)
        # 출력 레이어로, 디코더의 출력을 어휘 크기에 맞게 변환
        self.fc = tf.keras.layers.Dense(vocab_size)
        
        # Bahdanau 어텐션 메커니즘을 사용하는 어텐션 레이어를 생성
        self.attention = BahdanauAttention(self.dec_units)

    def call(self, x, h_dec, enc_out):
        # h_dec : 디코더의 이전 시점의 hidden state
        # enc_out: 인코더의 출력으로, 모든 시점에서의 인코더의 hidden state
        
        #  Bahdanau 어텐션 레이어를 사용하여 현재 디코더의 hidden state와 인코더의 hidden state 간의 어텐션 가중치 및 컨텍스트 벡터를 계산
        context_vec, attn = self.attention(enc_out, h_dec) 

        # 임베딩 및 어텐션 가중치 추가
        out = self.embedding(x)
        # 어텐션 가중치를 현재 입력에 추가
        out = tf.concat([tf.expand_dims(context_vec, 1), out], axis=-1)
        
        # GRU 레이어 실행
        #어텐션을 추가한 입력을 GRU 레이어에 전달하여 디코더의 현재 시점의 출력과 hidden state를 얻음
        out, h_dec = self.gru(out)
        
        # 출력 레이어 젹용
        out = tf.reshape(out, (-1, out.shape[2]))
        out = self.fc(out) # 최종 출력 생성

        return out, h_dec, attn

In [None]:
# 하이퍼파라미터 정의
BATCH_SIZE     = 64
SRC_VOCAB_SIZE = len(enc_tokenizer.index_word) + 1
TGT_VOCAB_SIZE = len(dec_tokenizer.index_word) + 1

units         = 1024
embedding_dim = 512

# 인코더 및 디코더 모델 초기화
encoder = Encoder(SRC_VOCAB_SIZE, embedding_dim, units)
decoder = Decoder(TGT_VOCAB_SIZE, embedding_dim, units)

# 샘플 입력 및 인코더 출력 확인
sequence_len = 30

sample_enc = tf.random.uniform((BATCH_SIZE, sequence_len)) # 랜덤한 인코더 입력 시퀀스를 생성
sample_output = encoder(sample_enc) # 인코더에 랜덤한 입력을 전달하여 인코더 출력을 확인

print ('Encoder Output:', sample_output.shape)

# 샘플 디코더 출력 및 어텐션 확인
sample_state = tf.random.uniform((BATCH_SIZE, units))

# 디코더에 랜덤한 입력과 hidden state를 전달하여 디코더 출력 및 어텐션 가중치를 확인
sample_logits, h_dec, attn = decoder(tf.random.uniform((BATCH_SIZE, 1)),
                                     sample_state, sample_output)

print ('Decoder Output:', sample_logits.shape)
print ('Decoder Hidden State:', h_dec.shape)
print ('Attention:', attn.shape)

### 모델 훈련

#### Optimizer & Loss
- tf.math.logical_not(tf.math.equal(real, 0))는 TensorFlow에서 사용되는 텐서 연산으로, 패딩된 부분을 제외한 유효한 데이터를 나타내는 마스크를 생성하는 역할을 함

In [None]:
# 옵티마이저 및 손실함수 초기화
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none') # 신경망 출력이 로짓인 경우에 사용
# reduction='none'은 손실을 각 샘플에 대해 계산하고, 나중에 모든 샘플에 대한 평균을 계산할 때 사용

def loss_function(real, pred):
    # 패딩된 부분은 고려하지 않도록 하기 위해 패딩된 부분에 대한 마스크를 생성
    # 패딩은 정수 0으로 표현되고, 이를 기준으로 마스크를 생성
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    
    # 실제 레이블과 모델의 예측값 간의 손실을 계산
    loss = loss_object(real, pred)
    
    mask = tf.cast(mask, dtype=loss.dtype) # 마스크를 손실 데이터 형식으로 변환
    loss *= mask # 손실에 마스크를 곱하여 패딩된 부분의 손실을 0으로 만듬
    
    return tf.reduce_mean(loss)

#### train_step 구현
- train_step()은 학습에 필요한 것을 모두 가져가 Loss를 계산한 후 반환하는 함수
- 학습 과정

    1) Encoder에 소스 문장을 전달해 컨텍스트 벡터인 enc_out 을 생성  
    
    2) t=0일 때, Decoder의 Hidden State는 Encoder의 Final State로 정의. h_dec = enc_out[:, -1]  
    
    3) Decoder에 입력으로 전달할 \<start> 토큰 문장 생성  
    
    4) \<start> 문장과 enc_out, Hidden State를 기반으로 다음 단어(t=1)를 예측. pred  
    
    5) 예측된 단어와 정답 간의 Loss를 구한 후, t=1의 정답 단어를 다음 입력으로 사용 (예측 단어 X)  
    
    6) 반복!

In [None]:
@tf.function

# src: 입력 소스 문장의 텐서
# tgt: 목표 타겟 문장의 텐서
# dec_tok: 디코더의 토크나이저 객체
def train_step(src, tgt, encoder, decoder, optimizer, dec_tok):
    bsz = src.shape[0]
    loss = 0

    # 그래디언트 계산
    with tf.GradientTape() as tape: 
        enc_out = encoder(src) #  주어진 소스 문장에 대해 인코더를 실행하여 인코더 출력을 얻음
        h_dec = enc_out[:, -1] # 디코더의 초기 hidden state를 인코더 출력의 마지막 시점으로 설정
        
        dec_src = tf.expand_dims([dec_tok.word_index['<start>']] * bsz, 1) # start 토큰 추가

        # 디코딩 루프 : 디코더를 반복적으로 실행해서 예측을 생성하고 손실을 계산
        for t in range(1, tgt.shape[1]):
            pred, h_dec, _ = decoder(dec_src, h_dec, enc_out)

            loss += loss_function(tgt[:, t], pred) # 각 시점에서의 손실을 누적
            
            # 다음 시점의 디코더 입력으로 현재 타임 스텝의 실제 값을 사용
            dec_src = tf.expand_dims(tgt[:, t], 1)
        
    batch_loss = (loss / int(tgt.shape[1])) # 전체 손실을 타임 스텝 수로 나누어 배치 손실 계산

    variables = encoder.trainable_variables + decoder.trainable_variables # 훈련 가능한 변수 가져옴
    gradients = tape.gradient(loss, variables) # 그래디언트를 계산
    optimizer.apply_gradients(zip(gradients, variables)) # 옵티마이저를 사용해서 모델의 기울기를 업데이트
    
    return batch_loss # 배치 손실 반환

이러한 훈련 스텝 함수는 Seq2Seq 모델의 훈련 과정 중 한 스텝을 수행하며, 주어진 데이터 배치에 대해 손실을 계산하고 모델을 업데이트함

#### 훈련 시작하기


In [None]:
from tqdm import tqdm    # 작업 진행 상태 모니터링
import random

EPOCHS = 10

for epoch in range(EPOCHS):
    total_loss = 0
    
    # : 인덱스 리스트를 생성하여 데이터셋을 미니배치로 나누기 위한 인덱스를 미리 계산
    idx_list = list(range(0, enc_train.shape[0], BATCH_SIZE)) # 각 배치의 시작 인덱스
    random.shuffle(idx_list)
    t = tqdm(idx_list)    # tqdm
    
    # 미니 배치 순회
    for (batch, idx) in enumerate(t): 
        batch_loss = train_step(enc_train[idx:idx+BATCH_SIZE],
                                dec_train[idx:idx+BATCH_SIZE],
                                encoder,
                                decoder,
                                optimizer,
                                dec_tokenizer)
    
        total_loss += batch_loss
        
        # tqdm에 현재 에포크 번호를 표시
        t.set_description_str('Epoch %2d' % (epoch + 1))    # tqdm
        # 현재까지의 평균 손실을 표시
        t.set_postfix_str('Loss %.4f' % (total_loss.numpy() / (batch + 1)))    # tqdm

이러한 훈련 루프를 통해 모델은 주어진 에포크 동안 전체 훈련 데이터셋에 대해 반복적으로 학습되며, tqdm을 사용하여 진행 상태를 모니터링함

In [None]:
# Validation Set을 사용하는 eval_step() 함수를 정의
# # eval_step() 정의하기
# train_step() 이후 eval_step() 진행하도록 소스 수정하기

@tf.function
def eval_step(src, tgt, encoder, decoder, dec_tok):
    bsz = src.shape[0]
    loss = 0

    enc_out = encoder(src)

    h_dec = enc_out[:, -1]
    
    dec_src = tf.expand_dims([dec_tok.word_index['<start>']] * bsz, 1)

    for t in range(1, tgt.shape[1]):
        pred, h_dec, _ = decoder(dec_src, h_dec, enc_out)

        loss += loss_function(tgt[:, t], pred)
        dec_src = tf.expand_dims(tgt[:, t], 1)
        
    batch_loss = (loss / int(tgt.shape[1]))
    
    return batch_loss


# Training Process

from tqdm import tqdm

EPOCHS = 10

for epoch in range(EPOCHS):
    total_loss = 0
    
    idx_list = list(range(0, enc_train.shape[0], BATCH_SIZE))
    random.shuffle(idx_list)
    t = tqdm(idx_list)

    for (batch, idx) in enumerate(t):
        batch_loss = eval_step(enc_train[idx:idx+BATCH_SIZE], # 데이터셋이 evaluation을 위한 데이터셋으로 변경되어야 함
                                dec_train[idx:idx+BATCH_SIZE],
                                encoder,
                                decoder,
                                optimizer,
                                dec_tokenizer)
    
        total_loss += batch_loss
        
        t.set_description_str('Epoch %2d' % (epoch + 1))
        t.set_postfix_str('Loss %.4f' % (total_loss.numpy() / (batch + 1)))
    
    test_loss = 0
    
    idx_list = list(range(0, enc_val.shape[0], BATCH_SIZE))
    random.shuffle(idx_list)
    t = tqdm(idx_list)

    for (test_batch, idx) in enumerate(t):
        test_batch_loss = eval_step(enc_val[idx:idx+BATCH_SIZE],
                                    dec_val[idx:idx+BATCH_SIZE],
                                    encoder,
                                    decoder,
                                    dec_tokenizer)
    
        test_loss += test_batch_loss

        t.set_description_str('Test Epoch %2d' % (epoch + 1))
        t.set_postfix_str('Test Loss %.4f' % (test_loss.numpy() / (test_batch + 1)))

In [None]:
def evaluate(sentence, encoder, decoder):
    attention = np.zeros((dec_train.shape[-1], enc_train.shape[-1]))
    
    sentence = preprocess_sentence(sentence)
    inputs = enc_tokenizer.texts_to_sequences([sentence.split()])
    inputs = tf.keras.preprocessing.sequence.pad_sequences(inputs,
                                                           maxlen=enc_train.shape[-1],
                                                           padding='post')

    result = ''

    enc_out = encoder(inputs)

    dec_hidden = enc_out[:, -1]
    dec_input = tf.expand_dims([dec_tokenizer.word_index['<start>']], 0)

    for t in range(dec_train.shape[-1]):
        predictions, dec_hidden, attention_weights = decoder(dec_input,
                                                             dec_hidden,
                                                             enc_out)

        attention_weights = tf.reshape(attention_weights, (-1, ))
        attention[t] = attention_weights.numpy()

        predicted_id = \
        tf.argmax(tf.math.softmax(predictions, axis=-1)[0]).numpy()

        result += dec_tokenizer.index_word[predicted_id] + ' '

        if dec_tokenizer.index_word[predicted_id] == '<end>':
            return result, sentence, attention

        dec_input = tf.expand_dims([predicted_id], 0)

    return result, sentence, attention


def plot_attention(attention, sentence, predicted_sentence):
    fig = plt.figure(figsize=(10,10))
    ax = fig.add_subplot(1, 1, 1)
    ax.matshow(attention, cmap='viridis')

    fontdict = {'fontsize': 14}

    ax.set_xticklabels([''] + sentence, fontdict=fontdict, rotation=90)
    ax.set_yticklabels([''] + predicted_sentence, fontdict=fontdict)

    ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
    ax.yaxis.set_major_locator(ticker.MultipleLocator(1))

    plt.show()


def translate(sentence, encoder, decoder):
    result, sentence, attention = evaluate(sentence, encoder, decoder)

    print('Input: %s' % (sentence))
    print('Predicted translation: {}'.format(result))
    
    attention = attention[:len(result.split()), :len(sentence.split())]
    plot_attention(attention, sentence.split(), result.split(' '))


translate("Can I have some coffee?", encoder, decoder)