**7장 – RNN과 어텐션을 사용한 자연어 처리**

# 설정

이 프로젝트에는 Python 3.7 이상이 필요합니다:

In [None]:
import sys

assert sys.version_info >= (3, 7)

그리고 TensorFlow ≥ 2.8:

In [None]:
from packaging import version
import tensorflow as tf

assert version.parse(tf.__version__) >= version.parse("2.8.0")

이전 챕터에서 했던 것처럼 기본 글꼴 크기를 정의하여 그림을 더 예쁘게 만들어 보겠습니다:

In [None]:
import matplotlib.pyplot as plt

plt.rc('font', size=14)
plt.rc('axes', labelsize=14, titlesize=14)
plt.rc('legend', fontsize=14)
plt.rc('xtick', labelsize=10)
plt.rc('ytick', labelsize=10)

import sys
# 코랩의 경우 나눔 폰트를 설치합니다.
if 'google.colab' in sys.modules:
    !sudo apt-get -qq -y install fonts-nanum
    import matplotlib.font_manager as fm
    font_files = fm.findSystemFonts(fontpaths=['/usr/share/fonts/truetype/nanum'])
    for fpath in font_files:
        fm.fontManager.addfont(fpath)

# 나눔 폰트를 사용합니다.
import matplotlib

matplotlib.rc('font', family='NanumBarunGothic')
matplotlib.rcParams['axes.unicode_minus'] = False

그리고 `images/nlp` 폴더를 만들고(아직 존재하지 않는 경우), 이 노트북을 통해 책에 사용할 그림을 고해상도로 저장하는 데 사용되는 `save_fig()` 함수를 정의해 보겠습니다:

In [None]:
from pathlib import Path

IMAGES_PATH = Path() / "images" / "nlp"
IMAGES_PATH.mkdir(parents=True, exist_ok=True)

def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300):
    path = IMAGES_PATH / f"{fig_id}.{fig_extension}"
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format=fig_extension, dpi=resolution)

이 챕터는 GPU가 없으면 매우 느려질 수 있으므로 GPU가 있는지 확인하거나 그렇지 않으면 경고를 표시합니다:

In [None]:
if not tf.config.list_physical_devices('GPU'):
    print("GPU가 감지되지 않았습니다. 신경망은 GPU가 없으면 매우 느릴 수 있습니다.")
    if "google.colab" in sys.modules:
        print("런타임 > 런타임 유형 변경으로 이동하여 하드웨어 가속기에서 GPU를 선택합니다.")

# char-RNN을 사용하여 셰익스피어 같은 텍스트 생성하기

## 훈련 데이터셋 생성하기

안드레이 카파시의 [char-rnn 프로젝트](https://github.com/karpathy/char-rnn/)에서 셰익스피어 데이터를 다운로드해 보겠습니다.

In [None]:
import tensorflow as tf

shakespeare_url = "https://homl.info/shakespeare"  # 단축 URL
filepath = tf.keras.utils.get_file("shakespeare.txt", shakespeare_url)
with open(filepath) as f:
    shakespeare_text = f.read()

In [None]:
# 추가 코드 - 짧은 텍스트 샘플을 표시합니다.
print(shakespeare_text[:80])

In [None]:
# 추가 코드 - 39개의 고유 문자를 모두 표시합니다(소문자로 변환 후).
"".join(sorted(set(shakespeare_text.lower())))

In [None]:
text_vec_layer = tf.keras.layers.TextVectorization(split="character",
                                                   standardize="lower")
text_vec_layer.adapt([shakespeare_text])
encoded = text_vec_layer([shakespeare_text])[0]

In [None]:
encoded -= 2  # 토큰 0(패딩)과 1(알 수 없음)을 드롭하는데, 이 토큰은 사용하지 않습니다.
n_tokens = text_vec_layer.vocabulary_size() - 2  # 고유 문자 수 = 39
dataset_size = len(encoded)  # 총 문자 수 = 1,115,394

In [None]:
n_tokens

In [None]:
dataset_size

In [None]:
def to_dataset(sequence, length, shuffle=False, seed=None, batch_size=32):
    ds = tf.data.Dataset.from_tensor_slices(sequence)
    ds = ds.window(length + 1, shift=1, drop_remainder=True)
    ds = ds.flat_map(lambda window_ds: window_ds.batch(length + 1))
    if shuffle:
        ds = ds.shuffle(100_000, seed=seed)
    ds = ds.batch(batch_size)
    return ds.map(lambda window: (window[:, :-1], window[:, 1:])).prefetch(1)

In [None]:
# 추가 코드 - to_dataset()을 사용하는 간단한 예제
# 이 데이터셋에는 하나의 샘플만 있습니다. 입력은 "to b"이고 출력은 "o be"입니다.
list(to_dataset(text_vec_layer(["To be"])[0], length=4))

In [None]:
length = 100
tf.random.set_seed(42)
train_set = to_dataset(encoded[:1_000_000], length=length, shuffle=True,
                       seed=42)
valid_set = to_dataset(encoded[1_000_000:1_060_000], length=length)
test_set = to_dataset(encoded[1_060_000:], length=length)

## Char-RNN 모델 구축 및 훈련하기

**경고**: 다음 코드는 GPU에 따라 실행하는 데 1~2시간이 걸릴 수 있습니다. GPU가 없는 경우 24시간 이상 걸릴 수 있습니다. 기다리지 않으려면 다음 두 코드 셀을 건너뛰고 아래 코드를 실행하여 사전 학습된 모델을 다운로드하세요.

**참고**: (GPU가 있는 경우) `GRU` 클래스는 다음 매개변수의 기본값을 사용할 때 cuDNN 가속을 사용합니다: `activation`, `recurrent_activation`, `recurrent_dropout`, `unroll`, `use_bias`, `reset_after`.

In [None]:
tf.random.set_seed(42)  # 추가 코드 - CPU에서 재현성 보장
model = tf.keras.Sequential([
    tf.keras.layers.Embedding(input_dim=n_tokens, output_dim=16),
    tf.keras.layers.GRU(128, return_sequences=True),
    tf.keras.layers.Dense(n_tokens, activation="softmax")
])
model.compile(loss="sparse_categorical_crossentropy", optimizer="nadam",
              metrics=["accuracy"])
model_ckpt = tf.keras.callbacks.ModelCheckpoint(
    "my_shakespeare_model", monitor="val_accuracy", save_best_only=True)
history = model.fit(train_set, validation_data=valid_set, epochs=10,
                    callbacks=[model_ckpt])

In [None]:
shakespeare_model = tf.keras.Sequential([
    text_vec_layer,
    tf.keras.layers.Lambda(lambda X: X - 2),  # <PAD>나 <UNK> 토큰 없음
    model
])

훈련이 완료될 때까지 기다리기 싫으시다면 모델을 미리 훈련해 두었습니다. 다음 코드에서 다운로드할 수 있습니다. 위에서 학습된 모델 대신 이 모델을 사용하려면 마지막 줄의 주석 처리를 해제하세요.

In [None]:
# 추가 코드 - 사전 훈련된 모델 다운로드
url = "https://github.com/ageron/data/raw/main/shakespeare_model.tgz"
path = tf.keras.utils.get_file("shakespeare_model.tgz", url, extract=True)
model_path = Path(path).with_name("shakespeare_model")
shakespeare_model = tf.keras.models.load_model(model_path)

In [None]:
y_proba = shakespeare_model.predict(["To be or not to b"])[0, -1]
y_pred = tf.argmax(y_proba)  # 가장 가능성이 높은 문자 ID 선택
text_vec_layer.get_vocabulary()[y_pred + 2]

## 가짜 셰익스피어 텍스트 생성하기

In [None]:
log_probas = tf.math.log([[0.5, 0.4, 0.1]])  # 확률 = 50%, 40%, 10%
tf.random.set_seed(42)
tf.random.categorical(log_probas, num_samples=8)  # 샘플 8개를 뽑습니다.

In [None]:
def next_char(text, temperature=1):
    y_proba = shakespeare_model.predict([text])[0, -1:]
    rescaled_logits = tf.math.log(y_proba) / temperature
    char_id = tf.random.categorical(rescaled_logits, num_samples=1)[0, 0]
    return text_vec_layer.get_vocabulary()[char_id + 2]

In [None]:
def extend_text(text, n_chars=50, temperature=1):
    for _ in range(n_chars):
        text += next_char(text, temperature)
    return text

In [None]:
tf.random.set_seed(42)  # 추가 코드 - CPU에서 재현성 보장

In [None]:
print(extend_text("To be or not to be", temperature=0.01))

In [None]:
print(extend_text("To be or not to be", temperature=1))

In [None]:
print(extend_text("To be or not to be", temperature=100))

## 상태가 있는 RNN

In [None]:
def to_dataset_for_stateful_rnn(sequence, length):
    ds = tf.data.Dataset.from_tensor_slices(sequence)
    ds = ds.window(length + 1, shift=length, drop_remainder=True)
    ds = ds.flat_map(lambda window: window.batch(length + 1)).batch(1)
    return ds.map(lambda window: (window[:, :-1], window[:, 1:])).prefetch(1)

stateful_train_set = to_dataset_for_stateful_rnn(encoded[:1_000_000], length)
stateful_valid_set = to_dataset_for_stateful_rnn(encoded[1_000_000:1_060_000],
                                                 length)
stateful_test_set = to_dataset_for_stateful_rnn(encoded[1_060_000:], length)

In [None]:
# 추가 코드 - to_dataset_for_stateful_rnn()을 사용한 간단한 예제
list(to_dataset_for_stateful_rnn(tf.range(10), 3))

배치당 두 개 이상의 윈도가 있다면 `to_dataset_for_stateful_rnn()` 대신 `to_batched_dataset_for_stateful_rnn()` 함수를 사용할 수 있습니다:

In [None]:
# 추가 코드 - 상태가 있는 RNN을 위해 배치 데이터셋을 준비하는 한 가지 방법을 보여줍니다.

import numpy as np

def to_non_overlapping_windows(sequence, length):
    ds = tf.data.Dataset.from_tensor_slices(sequence)
    ds = ds.window(length + 1, shift=length, drop_remainder=True)
    return ds.flat_map(lambda window: window.batch(length + 1))

def to_batched_dataset_for_stateful_rnn(sequence, length, batch_size=32):
    parts = np.array_split(sequence, batch_size)
    datasets = tuple(to_non_overlapping_windows(part, length) for part in parts)
    ds = tf.data.Dataset.zip(datasets).map(lambda *windows: tf.stack(windows))
    return ds.map(lambda window: (window[:, :-1], window[:, 1:])).prefetch(1)

list(to_batched_dataset_for_stateful_rnn(tf.range(20), length=3, batch_size=2))

In [None]:
tf.random.set_seed(42)  # 추가 코드 - CPU에서 재현성 보장
model = tf.keras.Sequential([
    tf.keras.layers.Embedding(input_dim=n_tokens, output_dim=16,
                              batch_input_shape=[1, None]),
    tf.keras.layers.GRU(128, return_sequences=True, stateful=True),
    tf.keras.layers.Dense(n_tokens, activation="softmax")
])

In [None]:
class ResetStatesCallback(tf.keras.callbacks.Callback):
    def on_epoch_begin(self, epoch, logs):
        self.model.reset_states()

In [None]:
# 추가 코드 - 다른 디렉터리를 사용하여 체크포인트를 저장합니다.
model_ckpt = tf.keras.callbacks.ModelCheckpoint(
    "my_stateful_shakespeare_model",
    monitor="val_accuracy",
    save_best_only=True)

**경고**: 다음 셀을 실행하는 데 시간이 걸릴 수 있습니다(GPU를 사용하지 않는 경우 1시간 정도 소요될 수 있음).

In [None]:
model.compile(loss="sparse_categorical_crossentropy", optimizer="nadam",
              metrics=["accuracy"])
history = model.fit(stateful_train_set, validation_data=stateful_valid_set,
                    epochs=10, callbacks=[ResetStatesCallback(), model_ckpt])

**추가 자료: 상태가 있는 RNN을 상태가 없는 RNN으로 변환하여 사용하기**

다른 배치 크기로 모델을 사용하려면 상태가 없는 모델 복사본을 만들어야 합니다:

In [None]:
stateless_model = tf.keras.Sequential([
    tf.keras.layers.Embedding(input_dim=n_tokens, output_dim=16),
    tf.keras.layers.GRU(128, return_sequences=True),
    tf.keras.layers.Dense(n_tokens, activation="softmax")
])

가중치를 설정하려면 먼저 모델을 빌드해야 합니다(가중치가 생성되도록):

In [None]:
stateless_model.build(tf.TensorShape([None, None]))

In [None]:
stateless_model.set_weights(model.get_weights())

In [None]:
shakespeare_model = tf.keras.Sequential([
    text_vec_layer,
    tf.keras.layers.Lambda(lambda X: X - 2),  # <PAD>나 <UNK> 토큰 없음
    stateless_model
])

In [None]:
tf.random.set_seed(42)

print(extend_text("to be or not to be", temperature=0.01))

# 감성 분석

In [None]:
import tensorflow_datasets as tfds

raw_train_set, raw_valid_set, raw_test_set = tfds.load(
    name="imdb_reviews",
    split=["train[:90%]", "train[90%:]", "test"],
    as_supervised=True
)
tf.random.set_seed(42)
train_set = raw_train_set.shuffle(5000, seed=42).batch(32).prefetch(1)
valid_set = raw_valid_set.batch(32).prefetch(1)
test_set = raw_test_set.batch(32).prefetch(1)

In [None]:
for review, label in raw_train_set.take(4):
    print(review.numpy().decode("utf-8")[:200], "...")
    print("레이블:", label.numpy())

In [None]:
vocab_size = 1000
text_vec_layer = tf.keras.layers.TextVectorization(max_tokens=vocab_size)
text_vec_layer.adapt(train_set.map(lambda reviews, labels: reviews))

**경고**: 다음 셀은 실행하는 데 몇 분 정도 걸리며 패딩 토큰을 마스킹하지 않았기 때문에 모델이 아무것도 학습하지 못할 수 있습니다(다음 섹션의 요점입니다).

In [None]:
embed_size = 128
tf.random.set_seed(42)
model = tf.keras.Sequential([
    text_vec_layer,
    tf.keras.layers.Embedding(vocab_size, embed_size),
    tf.keras.layers.GRU(128),
    tf.keras.layers.Dense(1, activation="sigmoid")
])
model.compile(loss="binary_crossentropy", optimizer="nadam",
              metrics=["accuracy"])
history = model.fit(train_set, validation_data=valid_set, epochs=2)

## 마스킹

**경고**: 다음 셀을 실행하는 데 시간이 걸립니다(GPU를 사용하지 않는 경우 30분 정도 소요될 수 있습니다).

In [None]:
embed_size = 128
tf.random.set_seed(42)
model = tf.keras.Sequential([
    text_vec_layer,
    tf.keras.layers.Embedding(vocab_size, embed_size, mask_zero=True),
    tf.keras.layers.GRU(128),
    tf.keras.layers.Dense(1, activation="sigmoid")
])
model.compile(loss="binary_crossentropy", optimizer="nadam",
              metrics=["accuracy"])
history = model.fit(train_set, validation_data=valid_set, epochs=5)

또는 수동 마스킹을 사용합니다:

In [None]:
tf.random.set_seed(42)  # 추가 코드 - CPU에서 재현성 보장
inputs = tf.keras.layers.Input(shape=[], dtype=tf.string)
token_ids = text_vec_layer(inputs)
mask = tf.math.not_equal(token_ids, 0)
Z = tf.keras.layers.Embedding(vocab_size, embed_size)(token_ids)
Z = tf.keras.layers.GRU(128, dropout=0.2)(Z, mask=mask)
outputs = tf.keras.layers.Dense(1, activation="sigmoid")(Z)
model = tf.keras.Model(inputs=[inputs], outputs=[outputs])

**경고**: 다음 셀을 실행하는 데 시간이 걸립니다(GPU를 사용하지 않는 경우 30분 정도 소요될 수 있습니다).

In [None]:
# 추가 코드 - 평소와 같이 모델을 컴파일하고 훈련합니다.
model.compile(loss="binary_crossentropy", optimizer="nadam",
              metrics=["accuracy"])
history = model.fit(train_set, validation_data=valid_set, epochs=5)

**추가 자료: 래그드 텐서 사용하기**

In [None]:
text_vec_layer_ragged = tf.keras.layers.TextVectorization(
    max_tokens=vocab_size, ragged=True)
text_vec_layer_ragged.adapt(train_set.map(lambda reviews, labels: reviews))
text_vec_layer_ragged(["Great movie!", "This is DiCaprio's best role."])

In [None]:
text_vec_layer(["Great movie!", "This is DiCaprio's best role."])

**경고**: 다음 셀을 실행하는 데 시간이 걸립니다(GPU를 사용하지 않는 경우 30분 정도 소요될 수 있습니다).

In [None]:
embed_size = 128
tf.random.set_seed(42)
model = tf.keras.Sequential([
    text_vec_layer_ragged,
    tf.keras.layers.Embedding(vocab_size, embed_size),
    tf.keras.layers.GRU(128),
    tf.keras.layers.Dense(1, activation="sigmoid")
])
model.compile(loss="binary_crossentropy", optimizer="nadam",
              metrics=["accuracy"])
history = model.fit(train_set, validation_data=valid_set, epochs=5)

## 사전 훈련된 임베딩 및 언어 모델 재사용하기

**경고**: 다음 셀을 실행하는 데 시간이 걸릴 수 있습니다(GPU를 사용하지 않는 경우 1시간 정도 소요될 수 있음).

In [None]:
import os
import tensorflow_hub as hub

os.environ["TFHUB_CACHE_DIR"] = "my_tfhub_cache"
tf.random.set_seed(42)  # 추가 코드 - CPU에서 재현성 보장
model = tf.keras.Sequential([
    # trainable=True로 할 경우 코랩에서 메모리 부족 에러가 발생합니다.
    hub.KerasLayer("https://tfhub.dev/google/universal-sentence-encoder/4",
                   trainable=False, dtype=tf.string, input_shape=[]),
    tf.keras.layers.Dense(64, activation="relu"),
    tf.keras.layers.Dense(1, activation="sigmoid")
])
model.compile(loss="binary_crossentropy", optimizer="nadam",
              metrics=["accuracy"])
model.fit(train_set, validation_data=valid_set, epochs=10)

# 신경망 기계 번역을 위한 인코더-디코더 네트워크

In [None]:
url = "https://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip"
path = tf.keras.utils.get_file("spa-eng.zip", origin=url, cache_dir="datasets",
                               extract=True)
text = (Path(path).with_name("spa-eng") / "spa.txt").read_text()

In [None]:
import numpy as np

text = text.replace("¡", "").replace("¿", "")
pairs = [line.split("\t") for line in text.splitlines()]
np.random.seed(42)  # 추가 코드 - CPU에서 재현성 보장
np.random.shuffle(pairs)
sentences_en, sentences_es = zip(*pairs)  # 쌍을 2개의 리스트로 분리합니다.

In [None]:
for i in range(3):
    print(sentences_en[i], "=>", sentences_es[i])

In [None]:
vocab_size = 1000
max_length = 50
text_vec_layer_en = tf.keras.layers.TextVectorization(
    vocab_size, output_sequence_length=max_length)
text_vec_layer_es = tf.keras.layers.TextVectorization(
    vocab_size, output_sequence_length=max_length)
text_vec_layer_en.adapt(sentences_en)
text_vec_layer_es.adapt([f"startofseq {s} endofseq" for s in sentences_es])

In [None]:
text_vec_layer_en.get_vocabulary()[:10]

In [None]:
text_vec_layer_es.get_vocabulary()[:10]

In [None]:
X_train = tf.constant(sentences_en[:100_000])
X_valid = tf.constant(sentences_en[100_000:])
X_train_dec = tf.constant([f"startofseq {s}" for s in sentences_es[:100_000]])
X_valid_dec = tf.constant([f"startofseq {s}" for s in sentences_es[100_000:]])
Y_train = text_vec_layer_es([f"{s} endofseq" for s in sentences_es[:100_000]])
Y_valid = text_vec_layer_es([f"{s} endofseq" for s in sentences_es[100_000:]])

In [None]:
tf.random.set_seed(42)  # 추가 코드 - CPU에서 재현성 보장
encoder_inputs = tf.keras.layers.Input(shape=[], dtype=tf.string)
decoder_inputs = tf.keras.layers.Input(shape=[], dtype=tf.string)

In [None]:
embed_size = 128
encoder_input_ids = text_vec_layer_en(encoder_inputs)
decoder_input_ids = text_vec_layer_es(decoder_inputs)
encoder_embedding_layer = tf.keras.layers.Embedding(vocab_size, embed_size,
                                                    mask_zero=True)
decoder_embedding_layer = tf.keras.layers.Embedding(vocab_size, embed_size,
                                                    mask_zero=True)
encoder_embeddings = encoder_embedding_layer(encoder_input_ids)
decoder_embeddings = decoder_embedding_layer(decoder_input_ids)

In [None]:
encoder = tf.keras.layers.LSTM(512, return_state=True)
encoder_outputs, *encoder_state = encoder(encoder_embeddings)

In [None]:
decoder = tf.keras.layers.LSTM(512, return_sequences=True)
decoder_outputs = decoder(decoder_embeddings, initial_state=encoder_state)

In [None]:
output_layer = tf.keras.layers.Dense(vocab_size, activation="softmax")
Y_proba = output_layer(decoder_outputs)

**경고**: 다음 셀을 실행하는 데 시간이 걸릴 수 있습니다(GPU를 사용하지 않는 경우 몇 시간이 걸릴 수 있습니다).

In [None]:
model = tf.keras.Model(inputs=[encoder_inputs, decoder_inputs],
                       outputs=[Y_proba])
model.compile(loss="sparse_categorical_crossentropy", optimizer="nadam",
              metrics=["accuracy"])
model.fit((X_train, X_train_dec), Y_train, epochs=10,
          validation_data=((X_valid, X_valid_dec), Y_valid))

In [None]:
def translate(sentence_en):
    translation = ""
    for word_idx in range(max_length):
        X = np.array([sentence_en])  # encoder input
        X_dec = np.array(["startofseq " + translation])  # decoder input
        y_proba = model.predict((X, X_dec))[0, word_idx]  # last token's probas
        predicted_word_id = np.argmax(y_proba)
        predicted_word = text_vec_layer_es.get_vocabulary()[predicted_word_id]
        if predicted_word == "endofseq":
            break
        translation += " " + predicted_word
    return translation.strip()

In [None]:
translate("I like soccer")

멋지네요! 그러나 이 모델은 긴 문장을 처리하는 데 어려움을 겪습니다:

In [None]:
translate("I like soccer and also going to the beach")

## 양방향 RNN

양방향 순환 층을 만들려면 일반 순환 층을 `Bidirectional` 층으로 감싸면 됩니다:

In [None]:
tf.random.set_seed(42)  # 추가 코드 - CPU에서 재현성 보장
encoder = tf.keras.layers.Bidirectional(
    tf.keras.layers.LSTM(256, return_state=True))

In [None]:
encoder_outputs, *encoder_state = encoder(encoder_embeddings)
encoder_state = [tf.concat(encoder_state[::2], axis=-1),  # 단기 상태 (0 & 2)
                 tf.concat(encoder_state[1::2], axis=-1)]  # 장기 상태 (1 & 3)

**경고**: 다음 셀을 실행하는 데 시간이 걸릴 수 있습니다(GPU를 사용하지 않는 경우 몇 시간이 걸릴 수 있습니다).

In [None]:
# 추가 코드 - 모델을 완성하고 학습시킵니다.
decoder = tf.keras.layers.LSTM(512, return_sequences=True)
decoder_outputs = decoder(decoder_embeddings, initial_state=encoder_state)
output_layer = tf.keras.layers.Dense(vocab_size, activation="softmax")
Y_proba = output_layer(decoder_outputs)
model = tf.keras.Model(inputs=[encoder_inputs, decoder_inputs],
                       outputs=[Y_proba])
model.compile(loss="sparse_categorical_crossentropy", optimizer="nadam",
              metrics=["accuracy"])
model.fit((X_train, X_train_dec), Y_train, epochs=10,
          validation_data=((X_valid, X_valid_dec), Y_valid))

In [None]:
translate("I like soccer")

## 빔 검색

이것은 빔 검색의 매우 기본적인 구현입니다. 읽기 쉽고 이해하기 쉽게 만들려고 노력했지만 속도에 최적화되지는 않았습니다! 이 함수는 먼저 모델을 사용하여 번역을 시작하기 위해 상위 _k_ 단어를 찾습니다(여기서 _k_는 빔 너비). 상위 _k_ 번역 각각에 대해 해당 번역에 추가할 수 있는 모든 가능한 단어의 조건부 확률을 평가합니다. 이러한 확장 번역과 해당 확률이 후보 목록에 추가됩니다. 모든 상위 _k_ 번역과 번역을 완성할 수 있는 모든 단어를 검토한 후에는 확률이 가장 높은 상위 _k_ 후보만 유지하고 모든 번역이 EOS 토큰으로 완료될 때까지 반복해서 반복합니다. 그런 다음 상위 번역이 반환됩니다(해당 EOS 토큰을 제거한 후).

* 참고: p(S)가 문장 S의 확률이고 p(W|S)가 번역이 S로 시작한다는 가정 하에 단어 W의 조건부 확률인 경우, 문장 S' = concat(S, W)의 확률은 p(S') = p(S) * p(W|S)입니다. 단어를 더 추가할수록 확률은 점점 더 작아집니다. 너무 작아져 부동 소수점 정밀도 오류가 발생할 수 있는 위험을 피하기 위해 이 함수는 확률 대신 로그 확률을 추적합니다. log(a\*b) = log(a) + log(b), 따라서 log(p(S')) = log(p(S)) + log(p(W|S)).

In [None]:
# 추가 코드 - 빔 검색의 기본 구현

def beam_search(sentence_en, beam_width, verbose=False):
    X = np.array([sentence_en])  # 인코더 입력
    X_dec = np.array(["startofseq"])  # 디코더 입력
    y_proba = model.predict((X, X_dec))[0, 0]  # 첫 번째 토큰의 확률
    top_k = tf.math.top_k(y_proba, k=beam_width)
    top_translations = [  # 촤상의 (log_proba, translation) 리스트
        (np.log(word_proba), text_vec_layer_es.get_vocabulary()[word_id])
        for word_proba, word_id in zip(top_k.values, top_k.indices)
    ]

    # 추가 코드 - verbose 모드에서 상위 첫 단어를 표시합니다.
    if verbose:
        print("상위 첫 단어:", top_translations)

    for idx in range(1, max_length):
        candidates = []
        for log_proba, translation in top_translations:
            if translation.endswith("endofseq"):
                candidates.append((log_proba, translation))
                continue  # 번역이 완료되었으므로 번역을 이어가지 않습니다.
            X = np.array([sentence_en])  # 인코더 입력
            X_dec = np.array(["startofseq " + translation])  # 디코더 입력
            y_proba = model.predict((X, X_dec))[0, idx]  # 마지막 토큰의 확률
            for word_id, word_proba in enumerate(y_proba):
                word = text_vec_layer_es.get_vocabulary()[word_id]
                candidates.append((log_proba + np.log(word_proba),
                                   f"{translation} {word}"))
        top_translations = sorted(candidates, reverse=True)[:beam_width]

        # 추가 코드 - verbose 모드의 경우 지금까지의 최상의 번역을 출력합니다.
        if verbose:
            print("지금까지 최상의 번역:", top_translations)

        if all([tr.endswith("endofseq") for _, tr in top_translations]):
            return top_translations[0][1].replace("endofseq", "").strip()

In [None]:
# 추가 코드 - 모델이 어떻게 오류를 발생시키는지 보여줍니다.
sentence_en = "I love cats and dogs"
translate(sentence_en)

In [None]:
# 추가 코드 - 빔 검색이 어떻게 도움이 되는지 보여줍니다.
beam_search(sentence_en, beam_width=3, verbose=True)

빔 검색에서 찾은 상위 3개 문장에 올바른 번역이 있지만 첫 번째 번역은 아닙니다. 작은 어휘를 사용하기 때문에 \[UNK] 토큰이 꽤 자주 사용되므로 페널티를 줄 수 있습니다(예를 들어, 빔 검색 함수에서 이 토큰의 확률을 2로 나눕니다.): 이렇게 하면 빔 검색이 이 토큰을 너무 많이 사용하지 않게 됩니다.

# 어텐션 메커니즘

모든 인코더의 출력을 `Attention` 층에 공급해야 하므로 인코더에 `return_sequences=True`를 추가해야 합니다:

In [None]:
tf.random.set_seed(42)  # 추가 코드 - CPU에서 재현성 보장
encoder = tf.keras.layers.Bidirectional(
    tf.keras.layers.LSTM(256, return_sequences=True, return_state=True))

In [None]:
# 추가 코드 - 모델의 이 부분은 이전과 완전히 동일합니다.
encoder_outputs, *encoder_state = encoder(encoder_embeddings)
encoder_state = [tf.concat(encoder_state[::2], axis=-1),  # 단기 (0 & 2)
                 tf.concat(encoder_state[1::2], axis=-1)]  # 장기 (1 & 3)
decoder = tf.keras.layers.LSTM(512, return_sequences=True)
decoder_outputs = decoder(decoder_embeddings, initial_state=encoder_state)

마지막으로 'Attention' 층과 출력 층을 추가해 보겠습니다:

In [None]:
attention_layer = tf.keras.layers.Attention()
attention_outputs = attention_layer([decoder_outputs, encoder_outputs])
output_layer = tf.keras.layers.Dense(vocab_size, activation="softmax")
Y_proba = output_layer(attention_outputs)

**경고**: 다음 셀을 실행하는 데 시간이 걸릴 수 있습니다(GPU를 사용하지 않는 경우 몇 시간이 걸릴 수 있습니다).

In [None]:
model = tf.keras.Model(inputs=[encoder_inputs, decoder_inputs],
                       outputs=[Y_proba])
model.compile(loss="sparse_categorical_crossentropy", optimizer="nadam",
              metrics=["accuracy"])
model.fit((X_train, X_train_dec), Y_train, epochs=10,
          validation_data=((X_valid, X_valid_dec), Y_valid))

In [None]:
translate("I like soccer and also going to the beach")

In [None]:
beam_search("I like soccer and also going to the beach", beam_width=3,
            verbose=True)

## 트랜스포머 구조: 어텐션이 필요한 전부다
### 위치 인코딩

In [None]:
max_length = 50  # 전체 훈련 세트에 있는 최대 길이
embed_size = 128
tf.random.set_seed(42)  # 추가 코드 - CPU에서 재현성 보장
pos_embed_layer = tf.keras.layers.Embedding(max_length, embed_size)
batch_max_len_enc = tf.shape(encoder_embeddings)[1]
encoder_in = encoder_embeddings + pos_embed_layer(tf.range(batch_max_len_enc))
batch_max_len_dec = tf.shape(decoder_embeddings)[1]
decoder_in = decoder_embeddings + pos_embed_layer(tf.range(batch_max_len_dec))

또는 훈련하지 않는 고정 위치 인코딩을 사용할 수도 있습니다:

In [None]:
class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, max_length, embed_size, dtype=tf.float32, **kwargs):
        super().__init__(dtype=dtype, **kwargs)
        assert embed_size % 2 == 0, "embed_size must be even"
        p, i = np.meshgrid(np.arange(max_length),
                           2 * np.arange(embed_size // 2))
        pos_emb = np.empty((1, max_length, embed_size))
        pos_emb[0, :, ::2] = np.sin(p / 10_000 ** (i / embed_size)).T
        pos_emb[0, :, 1::2] = np.cos(p / 10_000 ** (i / embed_size)).T
        self.pos_encodings = tf.constant(pos_emb.astype(self.dtype))
        self.supports_masking = True

    def call(self, inputs):
        batch_max_length = tf.shape(inputs)[1]
        return inputs + self.pos_encodings[:, :batch_max_length]

In [None]:
pos_embed_layer = PositionalEncoding(max_length, embed_size)
encoder_in = pos_embed_layer(encoder_embeddings)
decoder_in = pos_embed_layer(decoder_embeddings)

In [None]:
# 추가 코드
figure_max_length = 201
figure_embed_size = 512
pos_emb = PositionalEncoding(figure_max_length, figure_embed_size)
zeros = np.zeros((1, figure_max_length, figure_embed_size), np.float32)
P = pos_emb(zeros)[0].numpy()
i1, i2, crop_i = 100, 101, 150
p1, p2, p3 = 22, 60, 35
fig, (ax1, ax2) = plt.subplots(nrows=2, ncols=1, sharex=True, figsize=(9, 5))
ax1.plot([p1, p1], [-1, 1], "k--", label="$p = {}$".format(p1))
ax1.plot([p2, p2], [-1, 1], "k--", label="$p = {}$".format(p2), alpha=0.5)
ax1.plot(p3, P[p3, i1], "bx", label="$p = {}$".format(p3))
ax1.plot(P[:,i1], "b-", label="$i = {}$".format(i1))
ax1.plot(P[:,i2], "r-", label="$i = {}$".format(i2))
ax1.plot([p1, p2], [P[p1, i1], P[p2, i1]], "bo")
ax1.plot([p1, p2], [P[p1, i2], P[p2, i2]], "ro")
ax1.legend(loc="center right", fontsize=14, framealpha=0.95)
ax1.set_ylabel("$P_{(p,i)}$", rotation=0, fontsize=16)
ax1.grid(True, alpha=0.3)
ax1.hlines(0, 0, figure_max_length - 1, color="k", linewidth=1, alpha=0.3)
ax1.axis([0, figure_max_length - 1, -1, 1])
ax2.imshow(P.T[:crop_i], cmap="gray", interpolation="bilinear", aspect="auto")
ax2.hlines(i1, 0, figure_max_length - 1, color="b", linewidth=3)
cheat = 2  # need to raise the red line a bit, or else it hides the blue one
ax2.hlines(i2+cheat, 0, figure_max_length - 1, color="r", linewidth=3)
ax2.plot([p1, p1], [0, crop_i], "k--")
ax2.plot([p2, p2], [0, crop_i], "k--", alpha=0.5)
ax2.plot([p1, p2], [i2+cheat, i2+cheat], "ro")
ax2.plot([p1, p2], [i1, i1], "bo")
ax2.axis([0, figure_max_length - 1, 0, crop_i])
ax2.set_xlabel("$p$", fontsize=16)
ax2.set_ylabel("$i$", rotation=0, fontsize=16)
save_fig("positional_embedding_plot")
plt.show()

### 멀티 헤드 어텐션

In [None]:
N = 2  # 원본 구조는 6
num_heads = 8
dropout_rate = 0.1
n_units = 128  # 피드 포워드 블록의 첫 번째 Dense 층의 유닛 개수
encoder_pad_mask = tf.math.not_equal(encoder_input_ids, 0)[:, tf.newaxis]
Z = encoder_in
for _ in range(N):
    skip = Z
    attn_layer = tf.keras.layers.MultiHeadAttention(
        num_heads=num_heads, key_dim=embed_size, dropout=dropout_rate)
    Z = attn_layer(Z, value=Z, attention_mask=encoder_pad_mask)
    Z = tf.keras.layers.LayerNormalization()(tf.keras.layers.Add()([Z, skip]))
    skip = Z
    Z = tf.keras.layers.Dense(n_units, activation="relu")(Z)
    Z = tf.keras.layers.Dense(embed_size)(Z)
    Z = tf.keras.layers.Dropout(dropout_rate)(Z)
    Z = tf.keras.layers.LayerNormalization()(tf.keras.layers.Add()([Z, skip]))

In [None]:
decoder_pad_mask = tf.math.not_equal(decoder_input_ids, 0)[:, tf.newaxis]
causal_mask = tf.linalg.band_part(  # 하삼각행렬을 생성합니다.
    tf.ones((batch_max_len_dec, batch_max_len_dec), tf.bool), -1, 0)

In [None]:
encoder_outputs = Z  # 인코더의 최종 출력을 저장해 보겠습니다.
Z = decoder_in  # 디코더는 자체 입력으로 시작합니다.
for _ in range(N):
    skip = Z
    attn_layer = tf.keras.layers.MultiHeadAttention(
        num_heads=num_heads, key_dim=embed_size, dropout=dropout_rate)
    Z = attn_layer(Z, value=Z, attention_mask=causal_mask & decoder_pad_mask)
    Z = tf.keras.layers.LayerNormalization()(tf.keras.layers.Add()([Z, skip]))
    skip = Z
    attn_layer = tf.keras.layers.MultiHeadAttention(
        num_heads=num_heads, key_dim=embed_size, dropout=dropout_rate)
    Z = attn_layer(Z, value=encoder_outputs, attention_mask=encoder_pad_mask)
    Z = tf.keras.layers.LayerNormalization()(tf.keras.layers.Add()([Z, skip]))
    skip = Z
    Z = tf.keras.layers.Dense(n_units, activation="relu")(Z)
    Z = tf.keras.layers.Dense(embed_size)(Z)
    Z = tf.keras.layers.LayerNormalization()(tf.keras.layers.Add()([Z, skip]))

**경고**: 다음 셀을 실행하는 데 시간이 걸릴 수 있습니다(GPU를 사용하지 않는 경우 2~3시간 정도 소요될 수 있습니다).

In [None]:
Y_proba = tf.keras.layers.Dense(vocab_size, activation="softmax")(Z)
model = tf.keras.Model(inputs=[encoder_inputs, decoder_inputs],
                       outputs=[Y_proba])
model.compile(loss="sparse_categorical_crossentropy", optimizer="nadam",
              metrics=["accuracy"])
model.fit((X_train, X_train_dec), Y_train, epochs=10,
          validation_data=((X_valid, X_valid_dec), Y_valid))

In [None]:
translate("I like soccer and also going to the beach")

# 허깅 페이스

코랩에서 실행하는 경우 트랜스포머스 및 데이터셋 라이브러리를 설치합니다:

In [None]:
if "google.colab" in sys.modules:
    %pip install -q -U transformers
    %pip install -q -U datasets

In [None]:
from transformers import pipeline

classifier = pipeline("sentiment-analysis")  # 다른 많은 작업을 사용할 수 있습니다.
result = classifier("The actors were very convincing.")

모델은 매우 편향적일 수 있습니다. 예를 들어, 훈련 데이터와 사용 방법에 따라 일부 국가를 좋아하거나 싫어할 수 있으므로 신중하게 사용하세요:

In [None]:
classifier(["I am from India.", "I am from Iraq."])

In [None]:
model_name = "huggingface/distilbert-base-uncased-finetuned-mnli"
classifier_mnli = pipeline("text-classification", model=model_name)
classifier_mnli("She loves me. [SEP] She loves me not.")

In [None]:
from transformers import AutoTokenizer, TFAutoModelForSequenceClassification

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = TFAutoModelForSequenceClassification.from_pretrained(model_name)

In [None]:
token_ids = tokenizer(["I like soccer. [SEP] We all love soccer!",
                       "Joe lived for a very long time. [SEP] Joe is old."],
                      padding=True, return_tensors="tf")
token_ids

In [None]:
token_ids = tokenizer([("I like soccer.", "We all love soccer!"),
                       ("Joe lived for a very long time.", "Joe is old.")],
                      padding=True, return_tensors="tf")
token_ids

In [None]:
outputs = model(token_ids)
outputs

In [None]:
Y_probas = tf.keras.activations.softmax(outputs.logits)
Y_probas

In [None]:
Y_pred = tf.argmax(Y_probas, axis=1)
Y_pred  # 0 = contradiction, 1 = entailment, 2 = neutral

In [None]:
sentences = [("Sky is blue", "Sky is red"), ("I love her", "She loves me")]
X_train = tokenizer(sentences, padding=True, return_tensors="tf").data
y_train = tf.constant([0, 2])  # contradiction, neutral
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
model.compile(loss=loss, optimizer="nadam", metrics=["accuracy"])
history = model.fit(X_train, y_train, epochs=2)

# 연습문제 해답

## 1. to 7.

## 8.
_연습문제: 호크라이터와 슈미트후버는 LSTM에 관한 [논문](https://homl.info/93)에서 임베딩된 레버 문법을 사용했습니다. 이는 ‘BPBTSXXVPSEPE’와 같은 문자열을 만드는 인공 문법입니다. 이 주제에 대한 제니 오어의 훌륭한 소개(https://homl.info/108)를 확인해보세요. 특정 임베딩된 레버 문법 하나를 선택하고(제니 오어의 페이지에 있는 것과 같은), 그다음에 문자열이 이 문법을 따르는지 아닌지 구별하는 RNN을 훈련해보세요. 먼저 문법에 맞는 문자열 50%와 그렇지 않은 문자열 50%를 담은 훈련 배치를 생성하는 함수를 만들어야 합니다._

먼저 문법에 맞는 문자열을 생성하는 함수가 필요합니다. 이 문법은 각 상태에서 가능한 전이 상태의 리스트입니다. 하나의 전이는 출력할 문자열(또는 생성할 문법)과 다음 상태를 지정합니다.

In [None]:
default_reber_grammar = [
    [("B", 1)],           # (state 0) =B=>(state 1)
    [("T", 2), ("P", 3)], # (state 1) =T=>(state 2) or =P=>(state 3)
    [("S", 2), ("X", 4)], # (state 2) =S=>(state 2) or =X=>(state 4)
    [("T", 3), ("V", 5)], # 등등 ...
    [("X", 3), ("S", 6)],
    [("P", 4), ("V", 6)],
    [("E", None)]]        # (state 6) =E=>(terminal state)

embedded_reber_grammar = [
    [("B", 1)],
    [("T", 2), ("P", 3)],
    [(default_reber_grammar, 4)],
    [(default_reber_grammar, 5)],
    [("T", 6)],
    [("P", 6)],
    [("E", None)]]

def generate_string(grammar):
    state = 0
    output = []
    while state is not None:
        index = np.random.randint(len(grammar[state]))
        production, state = grammar[state][index]
        if isinstance(production, list):
            production = generate_string(grammar=production)
        output.append(production)
    return "".join(output)

기본 레버 문법을 기반으로 몇 가지 문자열을 생성해 보겠습니다:

In [None]:
np.random.seed(42)

for _ in range(25):
    print(generate_string(default_reber_grammar), end=" ")

좋아 보이네요. 이제 임베딩된 레버 문법을 기반으로 몇 가지 문자열을 생성해 보겠습니다:

In [None]:
np.random.seed(42)

for _ in range(25):
    print(generate_string(embedded_reber_grammar), end=" ")

좋네요, 이제 이 문법을 따르지 않는 문자열을 생성할 함수를 만듭니다. 무작위하게 문자열을 만들 수 있지만 그렇게 하면 너무 문제가 쉬워지므로 대신 문법을 따르는 문자열을 만든 후 하나의 문자만 바꾸어 놓도록 하겠습니다:

In [None]:
POSSIBLE_CHARS = "BEPSTVX"

def generate_corrupted_string(grammar, chars=POSSIBLE_CHARS):
    good_string = generate_string(grammar)
    index = np.random.randint(len(good_string))
    good_char = good_string[index]
    bad_char = np.random.choice(sorted(set(chars) - set(good_char)))
    return good_string[:index] + bad_char + good_string[index + 1:]

잘못된 문자열 몇 개를 만들어 보죠:

In [None]:
np.random.seed(42)

for _ in range(25):
    print(generate_corrupted_string(embedded_reber_grammar), end=" ")

문자열을 바로 RNN에 주입할 수는 없기 때문에 어떤 식으로든 인코딩해야 합니다. 한 가지 방법은 각 문자를 원-핫 인코딩하는 것입니다. 또 다른 방식은 임베딩을 사용하는 것입니다. 두 번째 방법을 사용해 보겠습니다(문자 개수가 작다면 원-핫 인코딩도 좋은 선택일 것입니다). 임베딩을 위해 각 문자열을 문자 ID의 시퀀스로 바꾸어야 합니다. 가능한 문자 "BEPSTVX"의 문자열 인덱스를 사용해 이런 작업을 수행하는 함수를 만들어 보겠습니다:

In [None]:
def string_to_ids(s, chars=POSSIBLE_CHARS):
    return [chars.index(c) for c in s]

In [None]:
string_to_ids("BTTTXXVVETE")

이제 50%는 올바른 문자열 50%는 잘못된 문자열로 이루어진 데이터셋을 만듭니다:

In [None]:
def generate_dataset(size):
    good_strings = [
        string_to_ids(generate_string(embedded_reber_grammar))
        for _ in range(size // 2)
    ]
    bad_strings = [
        string_to_ids(generate_corrupted_string(embedded_reber_grammar))
        for _ in range(size - size // 2)
    ]
    all_strings = good_strings + bad_strings
    X = tf.ragged.constant(all_strings, ragged_rank=1)
    y = np.array([[1.] for _ in range(len(good_strings))] +
                 [[0.] for _ in range(len(bad_strings))])
    return X, y

In [None]:
np.random.seed(42)

X_train, y_train = generate_dataset(10000)
X_valid, y_valid = generate_dataset(2000)

첫 번째 훈련 샘플을 확인해 보겠습니다:

In [None]:
X_train[0]

어떤 클래스에 속할까요?

In [None]:
y_train[0]

완벽합니다! 이제 올바른 문자열을 구분할 RNN을 만들 준비가 되었습니다. 간단한 시퀀스 이진 분류기를 만듭니다:

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

embedding_size = 5

model = tf.keras.Sequential([
    tf.keras.layers.InputLayer(input_shape=[None], dtype=tf.int32, ragged=True),
    tf.keras.layers.Embedding(input_dim=len(POSSIBLE_CHARS),
                              output_dim=embedding_size),
    tf.keras.layers.GRU(30),
    tf.keras.layers.Dense(1, activation="sigmoid")
])
optimizer = tf.keras.optimizers.SGD(learning_rate=0.02, momentum = 0.95,
                                    nesterov=True)
model.compile(loss="binary_crossentropy", optimizer=optimizer,
              metrics=["accuracy"])
history = model.fit(X_train, y_train, epochs=20,
                    validation_data=(X_valid, y_valid))

이제 두 개의 까다로운 문자열로 이 RNN을 테스트해 보죠: 첫 번째는 잘못된 것이고 두 번째는 올바른 것입니다. 이 문자열은 마지막에서 두 번째 글자만 다릅니다. RNN이 이를 맞춘다면 두 번째 문자가 항상 끝에서 두 번째 문자와 같아야 한다는 패턴을 알게 됐다는 것을 의미합니다. 이렇게 하려면 꽤 긴 단기 기억(long short-term memory)이 필요합니다(그래서 GRU 셀을 사용했습니다).

In [None]:
test_strings = ["BPBTSSSSSSSXXTTVPXVPXTTTTTVVETE",
                "BPBTSSSSSSSXXTTVPXVPXTTTTTVVEPE"]
X_test = tf.ragged.constant([string_to_ids(s) for s in test_strings], ragged_rank=1)

y_proba = model.predict(X_test)
print()
print("레버 문자열일 추정 확률:")
for index, string in enumerate(test_strings):
    print("{}: {:.2f}%".format(string, 100 * y_proba[index][0]))

쨘! 잘 작동하네요. 이 RNN이 매우 높은 신뢰도로 정확한 답을 냈습니다. :)

## 9.
_연습문제: 날짜 문자열 포맷을 변환하는 인코더-디코더 모델을 훈련하세요(예를 들어, ‘April 22, 2019’에서 ‘2019-04-22’로 바꿉니다)._

먼저 데이터셋을 만들어 보죠. 1000-01-01 ~ 9999-12-31 사이의 랜덤한 날짜를 사용하겠습니다:

In [None]:
from datetime import date

# strftime()의 %B 포맷은 로케일에 의존하기 때문에 사용할 수 있습니다.
MONTHS = ["January", "February", "March", "April", "May", "June",
          "July", "August", "September", "October", "November", "December"]

def random_dates(n_dates):
    min_date = date(1000, 1, 1).toordinal()
    max_date = date(9999, 12, 31).toordinal()

    ordinals = np.random.randint(max_date - min_date, size=n_dates) + min_date
    dates = [date.fromordinal(ordinal) for ordinal in ordinals]

    x = [MONTHS[dt.month - 1] + " " + dt.strftime("%d, %Y") for dt in dates]
    y = [dt.isoformat() for dt in dates]
    return x, y

다음은 입력과 출력 형식에 맞춘 랜덤한 몇 개의 날짜입니다:

In [None]:
np.random.seed(42)

n_dates = 3
x_example, y_example = random_dates(n_dates)
print("{:25s}{:25s}".format("Input", "Target"))
print("-" * 50)
for idx in range(n_dates):
    print("{:25s}{:25s}".format(x_example[idx], y_example[idx]))

입력 가능한 전체 문자를 나열해 보죠:

In [None]:
INPUT_CHARS = "".join(sorted(set("".join(MONTHS) + "0123456789, ")))
INPUT_CHARS

그리고 다음은 출력 가능한 전체 문자입니다:

In [None]:
OUTPUT_CHARS = "0123456789-"

이전 연습문제에서처럼 문자열을 문자 ID 리스트로 바꾸는 함수를 작성해 보겠습니다:

In [None]:
def date_str_to_ids(date_str, chars=INPUT_CHARS):
    return [chars.index(c) for c in date_str]

In [None]:
date_str_to_ids(x_example[0], INPUT_CHARS)

In [None]:
date_str_to_ids(y_example[0], OUTPUT_CHARS)

In [None]:
def prepare_date_strs(date_strs, chars=INPUT_CHARS):
    X_ids = [date_str_to_ids(dt, chars) for dt in date_strs]
    X = tf.ragged.constant(X_ids, ragged_rank=1)
    return (X + 1).to_tensor() # 0을 패딩 토큰 ID로 사용

def create_dataset(n_dates):
    x, y = random_dates(n_dates)
    return prepare_date_strs(x, INPUT_CHARS), prepare_date_strs(y, OUTPUT_CHARS)

In [None]:
np.random.seed(42)

X_train, Y_train = create_dataset(10000)
X_valid, Y_valid = create_dataset(2000)
X_test, Y_test = create_dataset(2000)

In [None]:
Y_train[0]

### 첫 번째 버전: 기본적인 seq2seq 모델

먼저 가장 간단한 모델을 시도해 보겠습니다: 입력 시퀀스가 먼저 (임베딩 층 뒤에 하나의 LSTM 층으로 구성된) 인코더를 통과하여 벡터로 출력됩니다. 그 다음 이 벡터가 (하나의 LSTM 층 뒤에 밀집 층으로 구성된) 디코더로 들어가 벡터의 시퀀스를 출력합니다. 각 벡터는 가능한 모든 출력 문자에 대한 추정 확률입니다.

디코더는 시퀀스를 입력으로 기대하기 때문에 가능한 가장 긴 출력 시퀀스만큼 (인코더의 출력) 벡터를 반복합니다.

In [None]:
embedding_size = 32
max_output_length = Y_train.shape[1]

np.random.seed(42)
tf.random.set_seed(42)

encoder = tf.keras.Sequential([
    tf.keras.layers.Embedding(input_dim=len(INPUT_CHARS) + 1,
                           output_dim=embedding_size,
                           input_shape=[None]),
    tf.keras.layers.LSTM(128)
])

decoder = tf.keras.Sequential([
    tf.keras.layers.LSTM(128, return_sequences=True),
    tf.keras.layers.Dense(len(OUTPUT_CHARS) + 1, activation="softmax")
])

model = tf.keras.Sequential([
    encoder,
    tf.keras.layers.RepeatVector(max_output_length),
    decoder
])

optimizer = tf.keras.optimizers.Nadam()
model.compile(loss="sparse_categorical_crossentropy", optimizer=optimizer,
              metrics=["accuracy"])
history = model.fit(X_train, Y_train, epochs=20,
                    validation_data=(X_valid, Y_valid))

좋아 보이네요, 100% 검증 정확도를 달성했습니다! 이 모델을 사용해 예측을 만들어 보죠. 문자 ID 시퀀스를 문자열로 바꾸는 함수를 작성하겠습니다:

In [None]:
def ids_to_date_strs(ids, chars=OUTPUT_CHARS):
    return ["".join([("?" + chars)[index] for index in sequence])
            for sequence in ids]

이제 모델을 사용해 샘플 날짜를 변환합니다.

In [None]:
X_new = prepare_date_strs(["September 17, 2009", "July 14, 1789"])

In [None]:
ids = model.predict(X_new).argmax(axis=-1)
for date_str in ids_to_date_strs(ids):
    print(date_str)

완벽합니다! :)

하지만 (가장 긴 날짜에 해당하는) 길이가 18인 입력 문자열에서만 모델이 훈련되었기 때문에 짧은 시퀀스에서는 잘 동작하지 않습니다:

In [None]:
X_new = prepare_date_strs(["May 02, 2020", "July 14, 1789"])

In [None]:
ids = model.predict(X_new).argmax(axis=-1)
for date_str in ids_to_date_strs(ids):
    print(date_str)

이런! 패딩을 사용해 훈련할 때와 동일한 길이의 시퀀스를 전달해야 할 것 같습니다. 이를 위해 헬퍼 함수를 작성해 보죠:

In [None]:
max_input_length = X_train.shape[1]

def prepare_date_strs_padded(date_strs):
    X = prepare_date_strs(date_strs)
    if X.shape[1] < max_input_length:
        X = tf.pad(X, [[0, 0], [0, max_input_length - X.shape[1]]])
    return X

def convert_date_strs(date_strs):
    X = prepare_date_strs_padded(date_strs)
    ids = model.predict(X).argmax(axis=-1)
    return ids_to_date_strs(ids)

In [None]:
convert_date_strs(["May 02, 2020", "July 14, 1789"])

좋네요! 물론 더 쉽게 날짜 변환 도구를 만들 수 있습니다(예를 들면, 정규식이나 더 단순한 문자열 조작). 하지만 신경망을 사용하는 것이 더 멋져 보이네요. ;-)

하지만 실제 시퀀스-투-시퀀스 문제는 더 어렵습니다. 완벽함을 추구하기 위해 더 강력한 모델을 만들어 보겠습니다.

### 두 번째 버전: 디코더에서 한 타임 스텝 이동된 타깃 주입하기(티처 포싱(teacher forcing))

디코더에세 인코더 출력 벡터를 단순히 반복한 것을 주입하는 대신 한 타임 스텝 오른쪽으로 이동된 타깃 시퀀스를 주입할 수 있습니다. 이렇게 하면 각 타임 스텝에서 디코더는 이전 타깃 문자가 무엇인지 알게 됩니다. 이는 더 복잡한 시퀀스-투-시퀀스 문제를 다루는데 도움이 됩니다.

각 타깃 시퀀스의 첫 번째 출력 문자는 이전 문자가 없기 때문에 시퀀스 시작(start-of-sequence, sos)을 나타내는 새로운 토큰이 필요합니다.

추론에서는 타깃을 알지 못하므로 디코더에게 무엇을 주입해야 할까요? sos 토큰을 시작해서 한 번에 하나의 문자를 예측하고 디코더에게 지금까지 예측한 모든 문자를 주입할 수 있습니다(나중에 이 노트북에서 더 자세히 알아 보겠습니다).

하지만 디코더의 LSTM이 스텝마다 이전 타깃을 입력으로 기대한다면 인코더의 벡터 출력을 어떻게 전달할까요? 한가지 방법은 출력 벡터를 무시하는 것입니다. 그리고 대신 인코더의 LSTM 상태를 디코더의 LSTM의 초기 상태로 사용합니다(이렇게 하려면 인코더의 LSTM과 디코더의 LSTM 유닛 개수가 같아야 합니다).

그럼 (훈련, 검증, 테스트를 위한) 디코더의 입력을 만들어 보죠. sos 토큰은 가능한 출력 문자의 마지막 ID + 1으로 나타냅니다.

In [None]:
sos_id = len(OUTPUT_CHARS) + 1

def shifted_output_sequences(Y):
    sos_tokens = tf.fill(dims=(len(Y), 1), value=sos_id)
    return tf.concat([sos_tokens, Y[:, :-1]], axis=1)

X_train_decoder = shifted_output_sequences(Y_train)
X_valid_decoder = shifted_output_sequences(Y_valid)
X_test_decoder = shifted_output_sequences(Y_test)

디코더의 훈련 입력을 확인해 보죠:

In [None]:
X_train_decoder

이제 모델을 만듭니다. 이제 더 이상 간단한 시퀀셜 모델이 아니므로 함수형 API를 사용하겠습니다:

In [None]:
encoder_embedding_size = 32
decoder_embedding_size = 32
lstm_units = 128

np.random.seed(42)
tf.random.set_seed(42)

encoder_input = tf.keras.layers.Input(shape=[None], dtype=tf.int32)
encoder_embedding = tf.keras.layers.Embedding(
    input_dim=len(INPUT_CHARS) + 1,
    output_dim=encoder_embedding_size)(encoder_input)
_, encoder_state_h, encoder_state_c = tf.keras.layers.LSTM(
    lstm_units, return_state=True)(encoder_embedding)
encoder_state = [encoder_state_h, encoder_state_c]

decoder_input = tf.keras.layers.Input(shape=[None], dtype=tf.int32)
decoder_embedding = tf.keras.layers.Embedding(
    input_dim=len(OUTPUT_CHARS) + 2,
    output_dim=decoder_embedding_size)(decoder_input)
decoder_lstm_output = tf.keras.layers.LSTM(lstm_units, return_sequences=True)(
    decoder_embedding, initial_state=encoder_state)
decoder_output = tf.keras.layers.Dense(len(OUTPUT_CHARS) + 1,
                                    activation="softmax")(decoder_lstm_output)

model = tf.keras.Model(inputs=[encoder_input, decoder_input],
                           outputs=[decoder_output])

optimizer = tf.keras.optimizers.Nadam()
model.compile(loss="sparse_categorical_crossentropy", optimizer=optimizer,
              metrics=["accuracy"])
history = model.fit([X_train, X_train_decoder], Y_train, epochs=10,
                    validation_data=([X_valid, X_valid_decoder], Y_valid))

이 모델도 100% 검증 정확도를 달성했지만 더 빠릅니다.

이 모델을 사용해 몇 가지 예측을 수행해 보죠. 이번에는 한 문자씩 예측해야 합니다.

In [None]:
sos_id = len(OUTPUT_CHARS) + 1

def predict_date_strs(date_strs):
    X = prepare_date_strs_padded(date_strs)
    Y_pred = tf.fill(dims=(len(X), 1), value=sos_id)
    for index in range(max_output_length):
        pad_size = max_output_length - Y_pred.shape[1]
        X_decoder = tf.pad(Y_pred, [[0, 0], [0, pad_size]])
        Y_probas_next = model.predict([X, X_decoder])[:, index:index+1]
        Y_pred_next = tf.argmax(Y_probas_next, axis=-1, output_type=tf.int32)
        Y_pred = tf.concat([Y_pred, Y_pred_next], axis=1)
    return ids_to_date_strs(Y_pred[:, 1:])

In [None]:
predict_date_strs(["July 14, 1789", "May 01, 2020"])

잘 동작하네요! 다음으로 트랜스포머 버전을 만들어 보세요. :)

## 10.
_문제: 케라스 웹사이트에 있는 "Natural language image search with a Dual Encoder"(https://homl.info/dualtuto) 예제를 살펴보세요. 동일한 임베딩 공간 내에서 이미지와 텍스트를 모두 표현할 수 있는 모델을 만드는 방법을 배우게 됩니다. 이렇게 하면 OpenAI의 [CLIP 모델](https://openai.com/blog/clip/)에서와 같이 텍스트 프롬프트를 사용하여 이미지를 검색할 수 있습니다._

링크를 클릭하고 안내를 따르기만 하면 됩니다.

## 11.
_문제: 허깅 페이스의 트랜스포머스 라이브러리를 사용하여 텍스트를 생성할 수 있는 사전 훈련된 언어 모델(예, GPT)을 다운로드하고 보다 설득력 있는 셰익스피어식 텍스트를 생성해 보세요. 모델의 `generate()` 메서드를 사용해야 합니다. 자세한 내용은 허깅 페이스 온라인 문서를 참조하세요._

먼저 사전 훈련된 모델을 로드해 보겠습니다. 이 예제에서는 추가 언어 모델(입력 임베딩에 가중치가 연결된 선형 층)을 위에 얹은 OpenAI의 GPT 모델을 사용합니다. 임포트하고 사전 훈련된 가중치를 로드해 보겠습니다(이렇게 하면 약 445MB의 데이터가 `~/.cache/torch/transformers`로 다운로드됩니다):

In [None]:
from transformers import TFOpenAIGPTLMHeadModel

model = TFOpenAIGPTLMHeadModel.from_pretrained("openai-gpt")

다음으로 이 모델에 특화된 토크나이저가 필요합니다. 만약 설치되어 있으면 [spaCy](https://spacy.io/) 및 [ftfy](https://pypi.org/project/ftfy/) 라이브러리를 사용하려고 시도하고, 그렇지 않으면 BERT의 `BasicTokenizer`와 바이트 쌍 인코딩(대부분의 사용 사례에 적합할 것입니다)을 사용합니다:

In [None]:
from transformers import OpenAIGPTTokenizer

tokenizer = OpenAIGPTTokenizer.from_pretrained("openai-gpt")

이제 토크나이저를 사용하여 프롬프트 텍스트를 토큰화 및 인코딩해 보겠습니다:

In [None]:
tokenizer("hello everyone")

In [None]:
prompt_text = "This royal throne of kings, this sceptred isle"
encoded_prompt = tokenizer.encode(prompt_text,
                                  add_special_tokens=False,
                                  return_tensors="tf")
encoded_prompt

쉬워요! 다음으로 모델을 사용하여 프롬프트 뒤를 이은 텍스트를 생성해 보겠습니다. 프롬프트 텍스트로 시작하여 각각 5개의 다른 문장을 40개의 토큰 안에서 생성합니다. 모든 하이퍼파라미터의 기능에 대한 설명은 패트릭 폰 플라텐(Hugging Face)의 [블로그 게시물](https://huggingface.co/blog/how-to-generate)을 참조하세요. 하이퍼파라미터를 사용해 더 나은 결과를 얻을 수 있습니다.

In [None]:
num_sequences = 5
length = 40

generated_sequences = model.generate(
    input_ids=encoded_prompt,
    do_sample=True,
    max_length=length + len(encoded_prompt[0]),
    temperature=1.0,
    top_k=0,
    top_p=0.9,
    repetition_penalty=1.0,
    num_return_sequences=num_sequences,
)

generated_sequences

이제 생성된 시퀀스를 디코딩하고 인쇄해 보겠습니다:

In [None]:
for sequence in generated_sequences:
    text = tokenizer.decode(sequence, clean_up_tokenization_spaces=True)
    print(text)
    print("-" * 80)

언어 모델이 위에 있는 변형을 포함하여 트랜스포머 라이브러리에서 사전 학습된 모델로 사용할 수 있는 GPT-2, CTRL, Transformer-XL 또는 XLNet과 같은 최신(및 더 큰) 모델을 사용해 볼 수 있습니다. 전처리 단계는 모델마다 조금씩 다르므로 트랜스포머 문서에서 이 [생성 예제](https://github.com/huggingface/transformers/blob/master/examples/run_generation.py)를 확인하시기 바랍니다(이 예제에서는 파이토치를 사용하지만 모델 클래스 이름 앞에 `TF`를 추가하고, `.to()` 메서드 호출을 제거하고, `"pt"` 대신 `return_tensors="tf"`를 사용하는 등 약간의 조정만 하면 작동합니다).