In [1]:
import tensorflow as tf
import numpy as np

from sklearn.model_selection import train_test_split

import matplotlib.ticker as ticker
import matplotlib.pyplot as plt

import time
import re
import os
import io

print(tf.__version__)

2.6.0


In [2]:
path_to_zip = tf.keras.utils.get_file(
    'spa-eng.zip',
    origin='http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip',
    extract=True)

path_to_file = os.path.dirname(path_to_zip)+"/spa-eng/spa.txt"

In [6]:
with open(path_to_file, "r") as f:
    raw = f.read().splitlines()

print("Data Size:", len(raw))
print("Example:")

for sen in raw[0:100][::20]: print(">>", sen)

Data Size: 118964
Example:
>> Go.	Ve.
>> Wait.	Esperen.
>> Hug me.	Abrázame.
>> No way!	¡Ni cagando!
>> Call me.	Llamame.


In [3]:
def preprocess_sentence(sentence, s_token=False, e_token=False):
    sentence = sentence.lower().strip()

    sentence = re.sub(r"([?.!,])", r" \1 ", sentence)    #?.!, 기준으로 공백을 앞뒤로 추가
    sentence = re.sub(r'[" "]+', " ", sentence)          # 중복된 공백 제거
    sentence = re.sub(r"[^a-zA-Z?.!,]+", " ", sentence)  # 알파벳,?.!, 이외는 공백으로 치환

    sentence = sentence.strip()                          # 불필요한 공백 제거

    if s_token:
        sentence = '<start> ' + sentence

    if e_token:
        sentence += ' <end>'
    
    return sentence



In [4]:
def tokenize(corpus):
    tokenizer = tf.keras.preprocessing.text.Tokenizer(filters='')
    tokenizer.fit_on_texts(corpus)

    tensor = tokenizer.texts_to_sequences(corpus)

    tensor = tf.keras.preprocessing.sequence.pad_sequences(tensor, padding='post')

    return tensor, tokenizer


In [7]:
enc_corpus = []
dec_corpus = []

num_examples = 30000

for pair in raw[:num_examples]:
    eng, spa = pair.split("\t")

    enc_corpus.append(preprocess_sentence(eng))
    dec_corpus.append(preprocess_sentence(spa, s_token=True, e_token=True))

print("English:", enc_corpus[100])   # go away !
print("Spanish:", dec_corpus[100])   # <start> salga de aqu ! <end>

English: go away !
Spanish: <start> salga de aqu ! <end>


In [9]:
# 토큰화하기
# train_test_split을 활용해서 훈련 데이터와 검증 데이터로 분리하기
# 1. 토큰화
input_tensor, enc_tokenizer = tokenize(enc_corpus)      # 영어 문장
target_tensor, dec_tokenizer = tokenize(dec_corpus)    # 스페인어 문장

# 2. 훈련/검증 데이터 분할
input_train, input_val, target_train, target_val = train_test_split(
    input_tensor, target_tensor, test_size=0.2, random_state=42
)

# 3. 확인
print(f"훈련 데이터 개수: {len(input_train)}")
print(f"검증 데이터 개수: {len(input_val)}")

훈련 데이터 개수: 24000
검증 데이터 개수: 6000


In [10]:
class BahdanauAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.w_dec = tf.keras.layers.Dense(units)
        self.w_enc = tf.keras.layers.Dense(units)
        self.w_com = tf.keras.layers.Dense(1)
    
    def call(self, h_enc, h_dec):
        # h_enc shape: [batch x length x units]
        # h_dec shape: [batch x units]

        h_enc = self.w_enc(h_enc)
        h_dec = tf.expand_dims(h_dec, 1)
        h_dec = self.w_dec(h_dec)

        score = self.w_com(tf.nn.tanh(h_dec + h_enc))
        
        attn = tf.nn.softmax(score, axis=1)

        context_vec = attn * h_enc
        context_vec = tf.reduce_sum(context_vec, axis=1)

        return context_vec, attn


    

In [25]:
class Encoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, enc_units):
        super(Encoder, self).__init__()
        self.enc_units = enc_units
        
        # 임베딩 
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
       
        # GRU 층
        self.gru = tf.keras.layers.GRU(
            enc_units,
            return_sequences=True,   # 전체 시퀀스 hidden state 반환 (Attention용)
            return_state=True,       # 마지막 hidden state 반환
            recurrent_initializer='glorot_uniform'
        )
        
    def call(self, x, hidden_state=None):     
        emb = self.embedding(x)  # [batch, seq_len, embedding_dim]

        # 초기 상태가 없다면 0으로 초기화
        if hidden_state is None:
            hidden_state = self.initialize_hidden_state(batch_size=tf.shape(emb)[0])
                    
        enc_output,state = self.gru(emb, initial_state=hidden_state) # state: [batch, enc_units]
       
        # 인코더의 모든 time step의 hidden states [batch, seq_len, units]
        return enc_output  # ← 단일 텐서만 반환!  
        
    
    def initialize_hidden_state(self, batch_size):
        return [tf.zeros((batch_size, self.enc_units))]

In [20]:
class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, dec_units):
        super(Decoder, self).__init__()
        
        self.dec_units = dec_units
        
        # 임베딩 
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        
        # TODO: Awesome Decoder Modules
        self.attention = BahdanauAttention( dec_units)   # Attention 필수 사용!
                   
        # GRU 층
        self.gru = tf.keras.layers.GRU(
            dec_units,
            return_sequences=True,   # 전체 시퀀스 hidden state 반환 (Attention용)
            return_state=True,       # 마지막 hidden state 반환
            recurrent_initializer='glorot_uniform'
        )
        
        self.fc = tf.keras.layers.Dense(vocab_size)  # 출력 단어 예측용

    def call(self, x, h_dec, enc_out):      
        # 1. 디코더 입력 임베딩
        x = self.embedding(x)  # [batch, 1, embedding_dim]
        
        # 2. Attention 계산 (self, h_enc, h_dec):
        context_vector, attention_weights = self.attention(enc_out, h_dec)  # context: [batch, enc_units]

        # 3. context_vector와 임베딩 결합
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)  # [batch, 1, embedding_dim + enc_units]

        # 4. GRU 통과
        output, state = self.gru(x)  # output: [batch, 1, dec_units]

        # 5. 출력 단어 예측
        output = tf.reshape(output, (-1, output.shape[2]))  # [batch, dec_units]
        x = self.fc(output)  # [batch, vocab_size]
        
        return x, state, attention_weights

In [26]:
# 코드를 실행하세요.

BATCH_SIZE     = 64
SRC_VOCAB_SIZE = len(enc_tokenizer.index_word) + 1
TGT_VOCAB_SIZE = len(dec_tokenizer.index_word) + 1

units         = 1024
embedding_dim = 512

encoder = Encoder(SRC_VOCAB_SIZE, embedding_dim, units)
decoder = Decoder(TGT_VOCAB_SIZE, embedding_dim, units)

# sample input
sequence_len = 30

sample_enc = tf.random.uniform((BATCH_SIZE, sequence_len))
sample_output = encoder(sample_enc)

print ('Encoder Output:', sample_output.shape)

sample_state = tf.random.uniform((BATCH_SIZE, units))

sample_logits, h_dec, attn = decoder(tf.random.uniform((BATCH_SIZE, 1)),
                                     sample_state, sample_output)

print ('Decoder Output:', sample_logits.shape)
print ('Decoder Hidden State:', h_dec.shape)
print ('Attention:', attn.shape)

Encoder Output: (64, 30, 1024)
Decoder Output: (64, 8894)
Decoder Hidden State: (64, 1024)
Attention: (64, 30, 1)


In [27]:
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')
    # from_logits=True: y_pred가 softmax 되기 전 값 (logits : softmax를 거치기 직전값)이기 때문
    # → 손실 함수 내부에서 softmax를 자동으로 처리
    # reduction='none': 손실을 일괄 평균하지 않고, 토큰마다 개별 손실 계산
    # → 나중에 마스크를 씌워서 패딩 위치 손실은 제거할 수 있게 함

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0)) # 패딩(0) 위치는 False
    loss = loss_object(real, pred)
    
    mask = tf.cast(mask, dtype=loss.dtype)             # Boolean mask → float32
    loss *= mask                                       # 패딩 위치는 손실 0으로 만듦
    
    return tf.reduce_mean(loss)



**훈련 Step**

In [29]:
@tf.function
def train_step(src, tgt, encoder, decoder, optimizer, dec_tok):
    batch_size = src.shape[0]
    loss = 0

    with tf.GradientTape() as tape:
        enc_out = encoder(src)
        h_dec = enc_out[:, -1]   # 마지막 time step의 hidden state를 디코더 초기값으로 사용
        
        # 디코더 입력 초기값: <start> 토큰
        dec_src = tf.expand_dims([dec_tok.word_index['<start>']] * batch_size, 1)

        for t in range(1, tgt.shape[1]):
            pred, h_dec, _ = decoder(dec_src, h_dec, enc_out)

            # 손실 누적 (teacher forcing)
            loss += loss_function(tgt[:, t], pred)
            # 다음 디코더 입력은 정답 (teacher forcing)
            dec_src = tf.expand_dims(tgt[:, t], 1)
        
    batch_loss = (loss / int(tgt.shape[1]))

    # 역전파 및 최적화
    variables = encoder.trainable_variables + decoder.trainable_variables
    gradients = tape.gradient(loss, variables)
    optimizer.apply_gradients(zip(gradients, variables))
    
    return batch_loss

**검증 Step**

In [30]:
@tf.function
def eval_step(src, tgt, encoder, decoder, optimizer, dec_tok):
    batch_size = src.shape[0]
    loss = 0

    enc_out = encoder(src)
    h_dec = enc_out[:, -1]   # 마지막 time step의 hidden state를 디코더 초기값으로 사용
        
    # 디코더 입력 초기값: <start> 토큰
    dec_src = tf.expand_dims([dec_tok.word_index['<start>']] * batch_size, 1)

    for t in range(1, tgt.shape[1]):
        pred, h_dec, _ = decoder(dec_src, h_dec, enc_out)

        # 손실 누적 (teacher forcing)
        loss += loss_function(tgt[:, t], pred)
        # 다음 디코더 입력은 정답 (teacher forcing)
        dec_src = tf.expand_dims(tgt[:, t], 1)
        
    batch_loss = (loss / int(tgt.shape[1]))
  
    return batch_loss

**훈련 후 검증**

In [None]:
from tqdm import tqdm    # tqdm은 훈련의 진행 과정을 한눈에 볼 수 있게 해주는 라이브러리
import random


EPOCHS = 10

for epoch in range(EPOCHS):
    total_loss = 0
    
    idx_list = list(range(0, input_train.shape[0], BATCH_SIZE))
    random.shuffle(idx_list)
    t = tqdm(idx_list)    # tqdm

    for (batch, idx) in enumerate(t):
        batch_loss = train_step(input_train[idx:idx+BATCH_SIZE],
                                target_train[idx:idx+BATCH_SIZE],
                                encoder,
                                decoder,
                                optimizer,
                                dec_tokenizer)
    
        total_loss += batch_loss
        
        t.set_description_str('Epoch %2d' % (epoch + 1))    # tqdm
        t.set_postfix_str('Loss %.4f' % (total_loss.numpy() / (batch + 1)))    # tqdm
    
    
    idx_list = list(range(0, input_val.shape[0], BATCH_SIZE))
    random.shuffle(idx_list)
    t = tqdm(idx_list)

    for (test_batch, idx) in enumerate(t):
        test_batch_loss = eval_step(input_val[idx:idx+BATCH_SIZE],
                                    target_val[idx:idx+BATCH_SIZE],
                                    encoder,
                                    decoder,
                                    dec_tokenizer)
    
        test_loss += test_batch_loss

        t.set_description_str('Test Epoch %2d' % (epoch + 1))
        t.set_postfix_str('Test Loss %.4f' % (test_loss.numpy() / (test_batch + 1)))

**Attention Map 시각화**

In [None]:
def evaluate(sentence, encoder, decoder, enc_train. dec_train):
    attention = np.zeros((dec_train.shape[-1], enc_train.shape[-1]))
    
    sentence = preprocess_sentence(sentence)
    inputs = enc_tokenizer.texts_to_sequences([sentence.split()])
    inputs = tf.keras.preprocessing.sequence.pad_sequences(inputs,
                                                           maxlen=enc_train.shape[-1],
                                                           padding='post')

    result = ''

    enc_out = encoder(inputs)

    dec_hidden = enc_out[:, -1]
    dec_input = tf.expand_dims([dec_tokenizer.word_index['<start>']], 0)

    for t in range(dec_train.shape[-1]):
        predictions, dec_hidden, attention_weights = decoder(dec_input,
                                                             dec_hidden,
                                                             enc_out)

        attention_weights = tf.reshape(attention_weights, (-1, ))
        attention[t] = attention_weights.numpy()

        predicted_id = \
        tf.argmax(tf.math.softmax(predictions, axis=-1)[0]).numpy()

        if dec_tokenizer.index_word[predicted_id] == '<end>':
            return result, sentence, attention
        
        result += dec_tokenizer.index_word[predicted_id] + ' '

        dec_input = tf.expand_dims([predicted_id], 0)

    return result, sentence, attention


def plot_attention(attention, sentence, predicted_sentence):
    fig = plt.figure(figsize=(10,10))
    ax = fig.add_subplot(1, 1, 1)
    ax.matshow(attention, cmap='viridis')

    fontdict = {'fontsize': 14}

    ax.set_xticklabels([''] + sentence, fontdict=fontdict, rotation=90)
    ax.set_yticklabels([''] + predicted_sentence, fontdict=fontdict)

    ax.xaxis.set_major_locator(ticker.MultipleLocator(1))
    ax.yaxis.set_major_locator(ticker.MultipleLocator(1))

    plt.show()


In [None]:
def translate(sentence, encoder, decoder):
    result, sentence, attention = evaluate(sentence, encoder, decoder, input_train, target_train)

    print('Input: %s' % (sentence))
    print('Predicted translation: {}'.format(result))
    
    attention = attention[:len(result.split()), :len(sentence.split())]
    plot_attention(attention, sentence.split(), result.split(' '))


translate("Can I have some coffee?", encoder, decoder)