<a href="https://colab.research.google.com/github/hws2002/Deep_Learning_with_Keras/blob/main/Chapter11/Chapter11_5_seq2seq.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

영어-스페인어 번역을 진행해보자

먼저 데이터셋을 다운받는다

In [None]:
!wget http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip
!unzip -q spa-eng.zip

--2025-01-27 13:24:12--  http://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip
Resolving storage.googleapis.com (storage.googleapis.com)... 172.253.118.207, 74.125.200.207, 74.125.130.207, ...
Connecting to storage.googleapis.com (storage.googleapis.com)|172.253.118.207|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2638744 (2.5M) [application/zip]
Saving to: ‘spa-eng.zip’


2025-01-27 13:24:14 (2.12 MB/s) - ‘spa-eng.zip’ saved [2638744/2638744]



파일 파싱

In [None]:
import os
text_path = os.path.join("spa-eng",'spa.txt')
with open(text_path, 'r') as f:
  lines = f.read().split('\n')[:-1]
lines

text_pairs = []
for line in lines: # 한 라인씩 처리
  english, spanish = line.split("\t")
  spanish = "[start] " + spanish + " [end]"
  text_pairs.append((english, spanish))

In [None]:
import random
print(random.choice(text_pairs))

('They work for me.', '[start] Trabajan para mí. [end]')


이를 훈련, 검증, 테스트 세트로 나눔

In [None]:
import random

random.shuffle(text_pairs)
num_val_samples = int(0.15  * len(text_pairs))
num_train_samples = len(text_pairs) -  2 * num_val_samples
train_pairs = text_pairs[:num_train_samples]
val_pairs = text_pairs[num_train_samples : num_train_samples + num_val_samples]
test_pairs = text_pairs[ num_train_samples + num_val_samples:]

이제 영어와 스페인어를 위한 2개의 TextVectorization 층을 준비한다. 문자열을 전처리하는 방식을 커스터마이징 해야 함.
* 앞에서 추가한 [start]와 [end] 토큰을 유지해야 함.
* 구두점은 문자마다 다름! 스페인어 TextVectorization 층에서 구두점 문자를 삭제하려면 거꾸로된 ? 문자도 삭제해야 함.

In [None]:
import tensorflow as tf
import string
import re

import keras
from keras import layers

strip_chars = string.punctuation + "¿" # [ 와 ] 문자는 유지하고 (strings.punctuation에 있는 다른 문자를 포함하여) ¿문자를 삭제함.
strip_chars = strip_chars.replace("[","")
strip_chars = strip_chars.replace("]","")

def custom_standardization(input_string):
  lowercase = tf.strings.lower(input_string)
  return tf.strings.regex_replace(
      lowercase, f"[{re.escape(strip_chars)}]", "") # [ 와 ] 문자는 유지하고 (strings.punctuation에 있는 다른 문자를 포함하여) ¿문자를 삭제함.

vocab_size = 15000
sequence_length = 20

source_vectorization = layers.TextVectorization(
    max_tokens = vocab_size,
    output_mode = "int",
    output_sequence_length = sequence_length,
)

target_vectorization = layers.TextVectorization(
    max_tokens = vocab_size,
    output_mode = "int",
    output_sequence_length = sequence_length + 1, # 훈련하는 동안 한 스텝 앞선 문장이 필요하기 때문에 토큰 하나가 추가된 스페인어 문장을 생성함.
    standardize = custom_standardization,
)

train_english_texts = [pair[0] for pair in train_pairs]
train_spanish_texts = [pair[1] for pair in train_pairs]
source_vectorization.adapt(train_english_texts)
target_vectorization.adapt(train_spanish_texts)


마지막으로 데이터를 tf.data파이프라인으로 변환한다.

In [None]:
batch_size = 64

def format_dataset(eng, spa):
  eng = source_vectorization(eng)
  spa = target_vectorization(spa)
  return ({
      'english' : eng,
      'spanish' : spa[:, :-1], # 입력 스페인어 문장은 마지막 토큰을 포함하지 않음
  }, spa[:,1:]) # 타깃 스페인어 문장은 한 스텝 앞의 문장임. 길이는 입력과 같음(20개의 단어)

def make_dataset(pairs):
  eng_texts, spa_texts = zip(*pairs) # unzip
  eng_texts = list(eng_texts)
  spa_texts = list(spa_texts)

  dataset = tf.data.Dataset.from_tensor_slices((eng_texts, spa_texts))
  dataset = dataset.batch(batch_size)
  dataset = dataset.map(format_dataset, num_parallel_calls=4)
  return dataset.shuffle(2048).prefetch(16).cache()

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)

데이터셋의 크기를 확인해 보자.

In [None]:
for inputs, targets in train_ds.take(1):
  print(f"inputs['english'].shape : {inputs['english'].shape}")
  print(f"inputs['spanish'].shape : {inputs['spanish'].shape}")
  print(f"targets.shape : {targets.shape}")

inputs['english'].shape : (64, 20)
inputs['spanish'].shape : (64, 20)
targets.shape : (64, 20)


# RNN을 사용한 시퀀스-투-시퀀스 모델

트랜스포머를 적용해 보기 전에 순환 신경망으로 시퀀스-투-시퀀스 모델을 먼저 만들어 보자.  
가장 쉬운 방법은 각 타임스텝의 RNN 출력을 그대로 유지하는 것임.  

In [None]:
inputs = keras.Input(shape = (sequence_length,), dtype = "int64")
embedded = layers.Embedding(input_dim = vocab_size, output_dim = 128)(inputs)
x = layers.LSTM(32, return_sequences = True)(embedded)
outputs = layers.Dense(vocab_size, activation = "softmax")(x)
model = keras.Model(inputs, outputs)
model.summary()

이 방식에는 두 가지 이슈가 있음.
1. 타깃 시퀀스가 항상 소스 시퀀스와 동일한 길이여야 함. 하지만 실제로는 이런 경우가 드물다. 이는 소스 시퀀스나 타깃 시퀀스에 패딩을 추가하여 길이를 맞추면 되기 때문에 치명적인 문제는 아님
2. RNN의 스텝별 처리 특징 때문에 모델이 타깃 시퀀스에 있는 토큰 N을 예측하기 위해 소스 시퀀스에 있는 토큰 0...N만 참조할 것임. 예를 들어, "The weather is nice today"를 프랑스어인 "Il fait beau aujord' hui"로 번역한다 하면, "The"에서 "Il"을 예측하고, "The weather" 에서 "Il fait"를 예측해야 함. 이는 불가능함.

## 케라스로 GRU 기반의 인코더와 디코더를 구현해 보자.  
여러 개의 상태 벡터가 있는 LSTM과 달리 GRU는 상태 벡터가 하나이기 때문에 LSTM 대신에 GRU를 선택하면 조금 더 간단해짐.  


### GRU 기반 인코더

In [None]:
from tensorflow import keras
from tensorflow.keras import layers

embed_dim = 256
latent_dim = 1024
source = keras.Input(shape = (None, ), dtype = "int64", name = 'english')
x = layers.Embedding(vocab_size, embed_dim, mask_zero = True)(source)
encoded_source = layers.Bidirectional(
    layers.GRU(latent_dim), merge_mode = "sum")(x)

### GRU 기반 디코더


In [None]:
past_target = keras.Input(shape= (None,), dtype = "int64", name = "spanish")
x = layers.Embedding(vocab_size, embed_dim, mask_zero = True)(past_target)
decoder_gru = layers.GRU(latent_dim, return_sequences = True)
x = decoder_gru(x, initial_state = encoded_source)
x = layers.Dropout(0.5)(x)
target_next_step = layers.Dense(vocab_size, activation = 'softmax')(x)
seq2seq_rnn = keras.Model([source, past_target], target_next_step)

훈련하는 동안 디코더는 전체 타깃 시퀀스를 입력받는다. 하지만 RNN의 스텝별 처리 특징 덕분에 입력에 있는 토큰 0…N만 사용하여 타깃에 있는 토큰 N을 예측함(타깃을 한 스텝 앞서게 만들었기 때문에 시퀀스의 다음 토큰에 해당함). 과거 정보만 사용해서 미래를 예측한다는 의미임.  

이제 훈련을 시작해 보자

In [None]:
seq2seq_rnn.compile(
    optimizer = "rmsprop",
    loss = "sparse_categorical_crossentropy",
    metrics = ['accuracy']
)
seq2seq_rnn.fit(train_ds, epochs = 15, validation_data = val_ds)

Epoch 1/15
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m214s[0m 157ms/step - accuracy: 0.1414 - loss: 5.2610 - val_accuracy: 0.1582 - val_loss: 3.8559
Epoch 2/15
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m258s[0m 159ms/step - accuracy: 0.1609 - loss: 3.8720 - val_accuracy: 0.1904 - val_loss: 3.2240
Epoch 3/15
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m205s[0m 157ms/step - accuracy: 0.1864 - loss: 3.3129 - val_accuracy: 0.2075 - val_loss: 2.8912
Epoch 4/15
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m204s[0m 157ms/step - accuracy: 0.2032 - loss: 2.9396 - val_accuracy: 0.2232 - val_loss: 2.6210
Epoch 5/15
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m263s[0m 158ms/step - accuracy: 0.2174 - loss: 2.6557 - val_accuracy: 0.2335 - val_loss: 2.4422
Epoch 6/15
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m205s[0m 158ms/step - accuracy: 0.2290 - loss: 2.4241 - val_accuracy: 0.2421 - val_loss:

<keras.src.callbacks.history.History at 0x78eed0b7e950>

In [None]:
import numpy as np

spa_vocab = target_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
  tokenized_input_sentence = source_vectorization([input_sentence])
  decoded_sentence = '[start]'
  for i in range(max_decoded_sentence_length):
    tokenized_target_sentence = target_vectorization([decoded_sentence])
    next_token_predictions = seq2seq_rnn.predict(
        [tokenized_input_sentence, tokenized_target_sentence])
    sampled_token_index = np.argmax(next_token_predictions[0, i, :])

    sampled_token = spa_index_lookup[sampled_token_index]
    decoded_sentence  += " " + sampled_token
    if sampled_token == "[end]":
      break
  return decoded_sentence


test_eng_texts = [pair[0] for pair in test_pairs]
for _ in range(20):
  input_sentence = random.choice(test_eng_texts)
  print("-")
  print(input_sentence)
  print(decode_sequence(input_sentence))

-
Teach them how to make a salad.
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 21ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[start] para hacer cómo hacer una decisión [end]
-
Is there an app for that?
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 20ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 26ms/step
[1m1/1[0m [32m━━━━━━━━

In [26]:
test_ds = make_dataset(test_pairs)
seq2seq_rnn = keras.models.load_model("seq2seq_rnn.keras")
seq2seq_rnn.evaluate(test_ds)

[1m279/279[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m16s[0m 53ms/step - accuracy: 0.1278 - loss: 6.4250


[6.431272506713867, 0.12756389379501343]

# 트랜스포머를 사용한 시퀀스-투-시퀀스 모델

## 트랜스포머 디코더


In [None]:
class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()
        self.supports_masking = True

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1),
             tf.constant([1, 1], dtype=tf.int32)], axis=0)
        return tf.tile(mask, mult)

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = tf.cast(
                mask[:, tf.newaxis, :], dtype="int32")
            padding_mask = tf.minimum(padding_mask, causal_mask)
        else:
            padding_mask = mask
        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=causal_mask)
        attention_output_1 = self.layernorm_1(inputs + attention_output_1)
        attention_output_2 = self.attention_2(
            query=attention_output_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
        )
        attention_output_2 = self.layernorm_2(
            attention_output_1 + attention_output_2)
        proj_output = self.dense_proj(attention_output_2)
        return self.layernorm_3(attention_output_2 + proj_output)

In [None]:
class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential(
            [layers.Dense(dense_dim, activation="relu"),
             layers.Dense(embed_dim),]
        )
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.supports_masking = True

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        attention_output = self.attention(
            inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "num_heads": self.num_heads,
            "dense_dim": self.dense_dim,
        })
        return config

In [None]:
class MaskingLayer(layers.Layer):
  def __init__(self):
    super().__init__()

  def call(self, x):
    return tf.math.not_equal(x, 0)

class PositionalEmbedding(layers.Layer):
  def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
    super().__init__(**kwargs)

    self.token_embeddings = layers.Embedding(
        input_dim = input_dim, output_dim = output_dim)

    self.position_embeddings = layers.Embedding(
        input_dim = sequence_length, output_dim = output_dim)

    self.sequence_length = sequence_length
    self.input_dim = input_dim
    self.output_dim = output_dim
    self.maskinglayer = MaskingLayer()

  def build(self, input_shape):
      # Keras automatically handles building the embeddings, but we can explicitly mark the layer as built
      self.built = True

  def compute_mask(self, inputs, mask = None):
    return self.maskinglayer(inputs)

  def call(self, inputs):
    length = tf.shape(inputs)[-1]
    positions = tf.range(start = 0, limit = length, delta = 1)
    embedded_tokens = self.token_embeddings(inputs)
    embedded_positions = self.position_embeddings(positions)
    return embedded_tokens + embedded_positions



  def get_config(self):
    config = super().get_config()
    config.update({
        "output_dim" : self.output_dim,
        "sequence_length" : self.sequence_length,
        "input_dim" : self.input_dim,
    })
    return config

In [None]:
embed_dim = 256
dense_dim = 2048
num_heads = 8

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="english")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="spanish")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs)
x = layers.Dropout(0.5)(x)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)
transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)
transformer.summary()

In [None]:
transformer.compile(
    optimizer="rmsprop",
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"])
transformer.fit(train_ds, epochs=30, validation_data=val_ds,)

Epoch 1/30




[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m92s[0m 57ms/step - accuracy: 0.1537 - loss: 4.4959 - val_accuracy: 0.2420 - val_loss: 2.5565
Epoch 2/30
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m121s[0m 49ms/step - accuracy: 0.2418 - loss: 2.6023 - val_accuracy: 0.2619 - val_loss: 2.1736
Epoch 3/30
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m77s[0m 46ms/step - accuracy: 0.2623 - loss: 2.2163 - val_accuracy: 0.2708 - val_loss: 2.0438
Epoch 4/30
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m59s[0m 45ms/step - accuracy: 0.2737 - loss: 2.0273 - val_accuracy: 0.2757 - val_loss: 1.9774
Epoch 5/30
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m59s[0m 45ms/step - accuracy: 0.2812 - loss: 1.9127 - val_accuracy: 0.2795 - val_loss: 1.9564
Epoch 6/30
[1m1302/1302[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m82s[0m 45ms/step - accuracy: 0.2865 - loss: 1.8379 - val_accuracy: 0.2801 - val_loss: 1.9615
Epoch 7/30
[1

<keras.src.callbacks.history.History at 0x7f4292a25b90>

In [27]:
transformer.evaluate(test_ds)

[1m279/279[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 17ms/step - accuracy: 0.2789 - loss: 2.4412


[2.4331533908843994, 0.2783428728580475]

마지막으로 이 모델을 사용하여 테스트 세트에 있는 이전에 본 적 없는 영어 문장을 번역해 보자.

In [30]:
import numpy as np

spa_vocab = target_vectorization.get_vocabulary()
spa_index_lookup = dict(zip(range(len(spa_vocab)), spa_vocab))
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
  tokenized_input_sentence = source_vectorization([input_sentence])
  decoded_sentence = '[start]'
  for i in range(max_decoded_sentence_length):
    tokenized_target_sentence = target_vectorization([decoded_sentence])[:,:-1]
    predictions = transformer(
        [tokenized_input_sentence, tokenized_target_sentence])
    sampled_token_index = np.argmax(predictions[0, i, :])

    sampled_token = spa_index_lookup[sampled_token_index]
    decoded_sentence  += " " + sampled_token
    if sampled_token == "[end]":
      break
  return decoded_sentence


test_eng_texts = [pair[0] for pair in test_pairs]
for _ in range(20):
  input_sentence = random.choice(test_eng_texts)
  print("-")
  print(input_sentence)
  print(decode_sequence(input_sentence))

-
They know the truth.
[start] saben la verdad [end]
-
Why would Tom care?
[start] por qué tom querría a tom [end]
-
Tom removed his wet socks.
[start] tomás se quitó los calcetines [UNK] [end]
-
I went to Kyoto by car.
[start] fui dos kioto a kioto [end]
-
She spent some time in Boston.
[start] ella pasó a alguna vez en boston [end]
-
Tom had no male heir.
[start] tom no tenía [UNK] [UNK] [end]
-
Please say something.
[start] por favor dice algo [end]
-
Can you please stop singing?
[start] puede parar de cantar por favor [end]
-
We met that night.
[start] nos encontramos esa noche [end]
-
The birds were flying in a group.
[start] los pájaros [UNK] en un grupo [end]
-
Do you like black cats?
[start] les gustan los gatos negra [end]
-
I doubt that Tom understands what I tried to tell him.
[start] dudo que [UNK] a tom lo que lo que lo quiera [end]
-
Tom found what he was looking for.
[start] tom encontró lo que estaba buscando [end]
-
Tom thought he was going to get a higher salary.
[sta

In [23]:
model.save("transformer.keras")