In [3]:

import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import matplotlib.pyplot as plt
import seaborn as sns

english_sentences = [
    'i love you',
    'he is a student',
    'she likes music',
    'we are learning attention',
    'you are amazing',    
]

french_sentences = [
    '<start> je t aime <end>',
    '<start> il est etudiant <end>',
    '<start> elle aime la musique <end>',
    '<start> nous apprenons l attention <end>',
    '<start> tu es incroyable <end>',    
]

In [4]:
from tensorflow.keras.layers import Input, LSTM, Dense,Embedding
# Funtional API

# 하이퍼파라미터
embedding_dim = 64
units = 128

# ========== ENCODER ==========
encoder_inputs = Input(shape=(max_eng_len,), name='encoder_input')
encoder_embedding = Embedding(eng_vocab_size, embedding_dim, name='encoder_embedding')(encoder_inputs)

# return_sequences=True: 모든 타임스텝의 hidden state 반환 (Attention 계산에 필요)
# return_state=True: 마지막 hidden state와 cell state 반환 (Decoder 초기화에 사용)
encoder_lstm = LSTM(units, return_sequences=True, return_state=True, name='encoder_lstm')
encoder_outputs, state_h, state_c = encoder_lstm(encoder_embedding)

# encoder_states = [state_h, state_c]는 Decoder의 초기 상태로 전달
encoder_states = [state_h, state_c]

print(f"✅ Encoder 구성 완료")
print(f"   - Encoder Outputs Shape: (batch, {max_eng_len}, {units})")
print(f"   - Hidden State Shape: (batch, {units})")
print(f"   - Cell State Shape: (batch, {units})")

# ========== DECODER ==========
max_decoder_len = max_fra_len - 1
decoder_inputs = Input(shape=(max_decoder_len,), name='decoder_input')
decoder_embedding = Embedding(fra_vocab_size, embedding_dim, name='decoder_embedding')(decoder_inputs)

# return_sequences=True: 모든 타임스텝 출력 (각 시점마다 Attention 적용)
# return_state=True: Inference 시 상태 전달용
decoder_lstm = LSTM(units, return_sequences=True, return_state=True, name='decoder_lstm')
decoder_outputs, _, _ = decoder_lstm(decoder_embedding, initial_state=encoder_states)

print(f"✅ Decoder 구성 완료")
print(f"   - Decoder Outputs Shape: (batch, {max_fra_len}, {units})")

NameError: name 'max_eng_len' is not defined

In [None]:
from tensorflow.keras.layers import Layer
# 디코더의 현재상태(query)와 인코더의 모든 히든스테 (value / keys)를 비교해서
# 각 인코더 타임스텝에 대한 중요도(가중치)를 계산하고, 그 가중치로 인코더 출력을 가중합해 context vector을 얻는다
class CAttention(Layer):
    def __init__(self, units, **kwargs):
        super(CAttention, self).__init__(**kwargs)
        self.units = units
    def build(self, input_shape):
        # W1: Query 변환 가중치 (decoder hidden state → attention space)
        self.W1 = self.add_weight(name='W1',
                                   shape=(input_shape[0][-1], self.units),
                                   initializer='glorot_uniform',
                                   trainable=True)
        
        # W2: Key 변환 가중치 (encoder hidden states → attention space)
        self.W2 = self.add_weight(name='W2',
                                   shape=(input_shape[1][-1], self.units),
                                   initializer='glorot_uniform',
                                   trainable=True)
        
        # V: Score를 스칼라로 변환
        self.V = self.add_weight(name='V',
                                  shape=(self.units, 1),
                                  initializer='glorot_uniform',
                                  trainable=True)
        super(CAttention, self).build(input_shape)
        
    def call(self, inputs):
        """
        Args:
            query: Decoder hidden state (batch, decoder_units)
            values: Encoder hidden states (batch, max_eng_len, encoder_units)
        
        Returns:
            context_vector: (batch, encoder_units)
            attention_weights: (batch, max_eng_len, 1)
        """
        query, values = inputs
        
        # Query 차원 확장: (batch, decoder_units) → (batch, 1, decoder_units)
        query_with_time_axis = tf.expand_dims(query, 1)
        
        # Score 계산: tanh(W1*Q + W2*K)
        # W1*Q: (batch, 1, units)
        # W2*K: (batch, max_eng_len, units)
        score = tf.nn.tanh(
            tf.matmul(query_with_time_axis, self.W1) + tf.matmul(values, self.W2)
        )
        # score shape: (batch, max_eng_len, units)
        
        # V를 곱해서 스칼라 score로 변환
        attention_logits = tf.matmul(score, self.V)
        # shape: (batch, max_eng_len, 1)
        
        # Softmax로 확률 분포 변환 (합이 1)
        attention_weights = tf.nn.softmax(attention_logits, axis=1)
        
        # Context vector 계산: 가중 합
        # attention_weights: (batch, max_eng_len, 1)
        # values: (batch, max_eng_len, encoder_units)
        context_vector = attention_weights * values
        context_vector = tf.reduce_sum(context_vector, axis=1)
        # shape: (batch, encoder_units)
        
        return context_vector, attention_weights
    
    def get_config(self):
        config = super().get_config()
        config.update({"units": self.units})
        return config

In [None]:
# Attention Layer
attention_layer = CAttention(units=10, name='attention')
attention_layer

In [None]:
# Attention을 각 디코더 타임스텝에 적용
def apply_attention(inputs):
    """
    각 타임스텝마다 Attention 계산
    """
    encoder_outputs, decoder_outputs = inputs
    
    # 타임스텝별로 순회하며 Context Vector 생성
    context_vectors = []
    attention_weights_list = []
    
    for t in range(max_decoder_len):
        # t 시점의 decoder hidden state 추출
        decoder_hidden_t = decoder_outputs[:, t, :]
        
        # Attention 계산
        context_vector, attention_weights = attention_layer([decoder_hidden_t, encoder_outputs])
        context_vectors.append(context_vector)
        attention_weights_list.append(attention_weights)
    
    # (batch, max_fra_len, encoder_units)로 재구성
    context_vectors = tf.stack(context_vectors, axis=1)
    attention_weights_all = tf.stack(attention_weights_list, axis=1)
    
    return context_vectors, attention_weights_all

from tensorflow.keras.layers import Concatenate,Dense,Lambda
from tensorflow.keras.models import Model
# Lambda Layer로 래핑
attention_result = Lambda(apply_attention, name='apply_attention')([encoder_outputs, decoder_outputs])
context_vectors, attention_weights_all = attention_result[0], attention_result[1]

# Decoder Output + Context Vector 결합
decoder_combined = Concatenate(axis=-1, name='concat')([decoder_outputs, context_vectors])

# 최종 출력 레이어 (단어 확률 분포)
output_layer = Dense(fra_vocab_size, activation='softmax', name='output')
outputs = output_layer(decoder_combined)


print(f"   - Context Vector Shape: (batch, {max_fra_len}, {units})")
print(f"   - Combined Shape: (batch, {max_fra_len}, {units * 2})")
print(f"   - Output Shape: (batch, {max_fra_len}, {fra_vocab_size})")

In [None]:
model = Model([encoder_inputs,decoder_inputs], outputs,name = 'attention_seq2seq')
model.compile(
    optimizer = 'rmsprop',
    loss = 'sparse_categorical_crossentropy',
    metrics=['accuracy']
)
model.summary()

# 학습데이터 준비 (Teacher Forcing)
decoder_input_data = fra_padded[ : , : -1 ]   #  <start> + 문장 ( 마지막 토큰 제외)
decoder_target_data = fra_padded[ : , 1 : ]  # 문장 + <end>  (첫 토큰 제외)
print(f'encoder input : {eng_padded.shape}')
print(f'decoder input : {decoder_input_data.shape}')
print(f'decoder target : {decoder_target_data.shape}')
# 학습 실행
history = model.fit(
    [eng_padded, decoder_input_data],
    np.expand_dims(decoder_target_data, -1),
    batch_size = 2,
    epochs = 1000,
    verbose=0
)

print(f"loss : {history.history['loss'][-1]}")
print(f"accuracy : {history.history['accuracy'][-1]}")

In [None]:
def translate(input_sentence):
    """
    영어 문장을 프랑스어로 번역
    """
    # 입력 문장 전처리
    input_seq = eng_tokenizer.texts_to_sequences([input_sentence])
    input_seq = pad_sequences(input_seq, maxlen=max_eng_len, padding='post')
    
    # 디코더 입력 초기화 (<start> 토큰)
    start_token_id = fra_tokenizer.word_index['<start>']
    end_token_id = fra_tokenizer.word_index['<end>']
    
    # 디코더 입력: <start> + 패딩
    decoder_input = np.zeros((1, max_decoder_len))
    decoder_input[0, 0] = start_token_id
    
    # 번역 생성
    output_sentence = []
    
    for t in range(1, max_decoder_len):
        # 예측
        predictions = model.predict([input_seq, decoder_input], verbose=0)
        
        # t-1 시점의 예측 결과에서 가장 높은 확률의 단어 선택
        predicted_id = np.argmax(predictions[0, t-1, :])
        
        # <end> 토큰이면 종료
        if predicted_id == end_token_id:
            break
        
        # 단어 추가
        predicted_word = fra_tokenizer.index_word.get(predicted_id, '')
        if predicted_word not in ['<start>', '<end>', '']:
            output_sentence.append(predicted_word)
        
        # 다음 입력으로 사용
        decoder_input[0, t] = predicted_id
    
    return ' '.join(output_sentence)

# 테스트
print("\n번역 결과:\n")
for i in range(len(english_sentences)):
    translation = translate(english_sentences[i])
    print(f"영어: {english_sentences[i]}")
    print(f"원본: {french_sentences[i]}")
    print(f"번역: {translation}")
    print("-" * 40)

# for eng_sent in english_sentences:
#     translation = translate(eng_sent)
#     print(f"영어: {eng_sent}")
#     print(f"번역: {translation}")
#     print("-" * 40)