In [None]:
# Transformer
# Seq2Seq_attention_번역_Q

# Seq2Seq 모델 + Attention
# Transformer 모델과는 다름

In [None]:
# 4. 실습2 - Seq2Seq 모델 + Attention

# RNN 기반 Seq2Seq 모델에 Attention 을 추가

import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense, Concatenate, Dot, Activation, Lambda, Softmax
from tensorflow.keras.optimizers import RMSprop

lines = pd.read_csv('fra.txt', names=['src', 'tar', 'lic'], sep='\t')
del lines['lic']
print(f"전체 샘플의 개수: {len(lines)}")

lines = lines.loc[:, 'src':'tar']
lines = lines[0:30000]
display(lines.sample(10))

lines.tar = lines.tar.apply(lambda x: '\t' + x + '\n')
display(lines.sample(10))

In [None]:
# 문자 집합 구축
src_vocab = set()
for line in lines.src:
  for char in line:
    src_vocab.add(char)

tar_vocab = set()
for line in lines.tar:
  for char in line:
    tar_vocab.add(char)

src_vocab_size = len(src_vocab) + 1
tar_vocab_size = len(tar_vocab) + 1
print(f"source 문장의 char집합: {src_vocab_size}")
print(f"target 문장의 char집합: {tar_vocab_size}\n")

src_vocab = sorted(list(src_vocab))
tar_vocab = sorted(list(tar_vocab))
print(f"{src_vocab[:10]}")
print(f"{tar_vocab[:10]}\n")

In [None]:
# 문장 인덱스화

src_to_index = dict([(word, i+1) for i, word in enumerate(src_vocab)])
tar_to_index = dict([(word, i+1) for i, word in enumerate(tar_vocab)])
print(f"{src_to_index}")
print(f"{tar_to_index}\n")

encoder_input = []
for line in lines.src:
  encoded_line = []
  for char in line:
    encoded_line.append(src_to_index[char])
  encoder_input.append(encoded_line)
print(f"원래 source 문장:\n{lines.src[:3]}")
print(f"인코더 입력 시퀀스: {encoder_input[:3]}\n")

decoder_input = []
for line in lines.tar:
  encoded_line = []
  for char in line:
    encoded_line.append(tar_to_index[char])
  decoder_input.append(encoded_line)
print(f"디코더 입력 시퀀스: {decoder_input[:3]}\n")

decoder_target = []
for line in lines.tar:
  timestep = 0
  encoded_line = []
  for char in line:
    if timestep > 0:
      encoded_line.append(tar_to_index[char])
    timestep = timestep + 1
  decoder_target.append(encoded_line)
print(f"디코더 정답 시퀀스: {decoder_target[:3]}\n")

In [None]:
# 제로 패딩을 적용하기 위한 최대 길이
# + One-Hot Encoding

max_src_len = max([len(line) for line in lines.src])
max_tar_len = max([len(line) for line in lines.tar])
print(f"source 문장의 최대 길이: {max_src_len}")
print(f"target 문장의 최대 길이: {max_tar_len}")

encoder_input = pad_sequences(
  encoder_input,
  maxlen=max_src_len,
  padding='post'
)
decoder_input = pad_sequences(
  decoder_input,
  maxlen=max_tar_len,
  padding='post'
)
decoder_target = pad_sequences(
  decoder_target,
  maxlen=max_tar_len,
  padding='post'
)
encoder_input = to_categorical(encoder_input)
decoder_input = to_categorical(decoder_input)
decoder_target = to_categorical(decoder_target)

In [None]:
# 모델 생성
# Attention lyaer 를 추가한 Seq2Seq 모델 사용

class AttentionLayer(tf.keras.layers.Layer):
  def __init__(self):
    super(AttentionLayer, self).__init__()
    
  def call(self, query, key, value):
    scores = tf.matmul(query, key, transpose_b=True)
    attention_weights = Softmax(axis=-1)(scores)
    context_vector = tf.matmul(attention_weights, value)
    return context_vector, attention_weights

In [None]:
# 인코더 정의 
encoder_inputs = Input(shape=(None, src_vocab_size)) 
encoder_lstm = LSTM(256, return_sequences=True, return_state=True) 
encoder_outputs, state_h, state_c = encoder_lstm(encoder_inputs) 
 
# 디코더 정의 
decoder_inputs = Input(shape=(None, tar_vocab_size)) 
decoder_lstm = LSTM(256, return_sequences=True, return_state=True) 
decoder_outputs, _, _ = decoder_lstm(decoder_inputs, initial_state=[state_h, state_c]) 

In [None]:
# 어텐션 레이어 추가 

# AttentioLayer  선언 
attention_layer = AttentionLayer() 

# context_vector, attention_weights 에 출력 담기
context_vector, attention_weights = attention_layer(decoder_outputs, encoder_outputs, encoder_outputs)

# 컨텍스트 벡터와 디코더 출력을 연결
decoder_concat_input = Concatenate(axis=-1)([context_vector, decoder_outputs])

# 출력 레이어 
decoder_dense = Dense(tar_vocab_size, activation='softmax') 
decoder_outputs = decoder_dense(decoder_concat_input)

In [None]:
# 모델 정의
model = Model(
  [encoder_inputs, decoder_inputs],
  decoder_outputs
)
model.compile(
  optimizer=RMSprop(),
  loss='categorical_crossentropy',
  metrics=['accuracy']
)
model.summary()

# 모델 학습 
model.fit(
  [encoder_input, decoder_input],
  decoder_target,
  batch_size=64,
  epochs=2,
  validation_split=0.2 
)

In [None]:
# 추론용 인코더, 디코더 모델 재정의

# 인코더 모델
encoder_model = Model(encoder_inputs, [encoder_outputs, state_h, state_c])

# 디코더
# 입력 정의
decoder_state_input_h = Input(shape=(256,), name="decoder_state_input_h")
decoder_state_input_c = Input(shape=(256,), name="decoder_state_input_c")

# 디코더 LSTM
decoder_outputs, state_h, state_c = decoder_lstm(decoder_inputs, initial_state=[decoder_state_input_h, decoder_state_input_c])

# 어텐션 레이어 추가
context_vector, attention_weights = attention_layer(decoder_outputs, encoder_outputs, encoder_outputs)

# 컨텍스트 벡터와 디코더 출력을 결합
decoder_concat_input = Concatenate(name="concatenate_layer")([context_vector, decoder_outputs])

# 최종 출력 레이어
decoder_final_output = decoder_dense(decoder_concat_input)

# 디코더 모델 생성
decoder_model = Model(
  inputs=[decoder_inputs, encoder_outputs, decoder_state_input_h, decoder_state_input_c],
  outputs=[decoder_final_output, state_h, state_c, attention_weights]
)

# 오류 메시지에 나타난 문제를 디버깅하기 위해 모델의 개요를 출력
encoder_model.summary()

In [None]:
# 예측값 디코딩

index_to_tar = dict((i, char) for char, i in tar_to_index.items())

def decode_sequence(input_seq):
  # 인코더의 상태를 얻음
  encoder_output, state_h, state_c = encoder_model.predict(input_seq)

  # 디코더의 초기 입력 (시작 심볼)
  target_seq = np.zeros((1, 1, tar_vocab_size))
  target_seq[0, 0, tar_to_index['\t']] = 1.

  # 디코딩 루프
  stop_condition = False
  decoded_sentence = ''
  while not stop_condition:
    output_tokens, h, c, a = decoder_model.predict([target_seq, encoder_output, state_h, state_c])

    # 샘플링
    sampled_token_index = np.argmax(output_tokens[0, -1, :])
    sampled_char = index_to_tar[sampled_token_index]
    decoded_sentence += sampled_char

    # 종료 조건: 최대 길이 초과 또는 종료 심볼
    if (sampled_char == '\n' or len(decoded_sentence) > max_tar_len):
      stop_condition = True

    # 다음 디코더 입력 업데이트
    target_seq = np.zeros((1, 1, tar_vocab_size))
    target_seq[0, 0, sampled_token_index] = 1.

    # 상태 업데이트
    state_h, state_c = h, c

  return decoded_sentence

In [None]:
# 예측

for seq_index in [3, 50, 100, 300, 1001]:
  input_seq = encoder_input[seq_index:seq_index+1]
  decoded_sentence = decode_sequence(input_seq)
  print(f"입력 문장: {lines.src[seq_index]}")
  print(f"정답 문장: {lines.tar[seq_index][1:len(lines.tar[seq_index])]}")
  print(f"번역 문장: {decoded_sentence[0:len(decoded_sentence)]}")