<a href="https://colab.research.google.com/github/hobin-jang/colab_test/blob/master/fake_shakespeare.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

In [None]:
shakespeare_url = "https://homl.info/shakespeare"

In [None]:
filepath = tf.keras.utils.get_file("shakespeare.txt", shakespeare_url)
with open(filepath) as f:
  shakespeare_text = f.read()

In [None]:
"""
모든 글자 정수로 인코딩
keras의 Tokenizer 클래스 사용
char_level = True : 단어 수준 대신 글자 수준 인코딩
"""
tokenizer = tf.keras.preprocessing.text.Tokenizer(char_level=True)
tokenizer.fit_on_texts(shakespeare_text)

In [None]:
tokenizer.texts_to_sequences("first") # 텍스트를 인코딩한 결과

In [None]:
tokenizer.texts_to_sequences(["first"])

In [None]:
max_id = len(tokenizer.word_index) # 고유 글자 개수
dataset_size = tokenizer.document_count # 전체 글자 개수
print(max_id, dataset_size)

In [None]:
tokenizer.word_index

In [None]:
[encoded] = np.array(tokenizer.texts_to_sequences([shakespeare_text])) - 1 # 0부터 인코딩하기 위해

In [None]:
encoded

In [None]:
# 훈련 데이터 : 전체의 90%, 검증, 테스트 데이터 : 나머지
train_size = dataset_size * 90 // 100
# 한 번에 한 글자 반환
dataset = tf.data.Dataset.from_tensor_slices(encoded[:train_size])

In [None]:
"""
전체 글자 : 백만 개 이상의 시퀀스 하나
이를 직접 RNN 훈련 시키면 백만 개의 층이 있는 것과 같다.
그러므로 슬라이싱하여 (window 메서드 이용) 텍스트 윈도우로 나누어 부분 문자열을 이용한 RNN을 진행
(TBPTT)
"""
n_steps = 100
window_length = n_steps + 1
dataset = dataset.window(window_length, shift=1, drop_remainder=True)
# window 메서드는 기본적으로 원도우를 중복하지 않음
# shift=1 : 한 칸씩 옆으로 움직임 0~100, 1~101, ... default=window_length
# drop_remainder = True : 모든 윈도우에 동일한 글자 포함되도록 (여기서는 101개)
# 지정하지 않으면 글자 수 점점 줄여나감 101 > 100 > 99 > ... > 1

In [None]:
"""
window 메서드는 각각 하나의 데이터셋으로 표현되는 윈도우를 포함하는 데이터셋을 만든다. (중첩 데이터)
훈련에는 중첩 데이터셋을 바로 사용할 수 없음 => flat_map 메서드를 이용해 플랫 데이터로 변환
{{1,2},{3,4,5,6}}을 flat 시키면 {1,2,3,4,5,6}
lambda ds: ds.batch(2) : 각 데이터셋에 적용할 변환 함수를 flat_map 메서드에 전달해야 함
위 경우를 전달하면 텐서 2개를 가진 데이터셋으로 변환 
{{1,2},{3,4,5,6}} => {[1,2],[3,4],[5,6]}
"""
dataset = dataset.flat_map(lambda window: window.batch(window_length))
batch_size = 32
dataset = dataset.shuffle(10000).batch(batch_size)
dataset = dataset.map(lambda windows: (windows[:, :-1], windows[:, 1:]))

In [None]:
"""
고유 글자 수 적으므로 원-핫 인코딩 사용
"""
dataset = dataset.map(lambda x_batch, y_batch: (tf.one_hot(x_batch, depth=max_id), y_batch))

In [None]:
dataset = dataset.prefetch(1)

In [None]:
# 모델 만들고 훈련 시키기
model = tf.keras.models.Sequential([
              tf.keras.layers.GRU(128, return_sequences=True, input_shape=[None, max_id],
                                  dropout=0.2, recurrent_dropout=0.2),
              tf.keras.layers.GRU(128, return_sequences=True, dropout=0.2, recurrent_dropout=0.2),
              tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(max_id, activation="softmax"))
])

In [None]:
"""
model.compile(loss="sparse_categorical_crossentropy", optimizer="adam")
history = model.fit(dataset, epochs=20)
"""
# 오래 걸려서 학습 중지

In [None]:
# 전처리 함수
def preprocessing(texts):
  x = np.array(tokenizer.texts_to_sequences(texts)) - 1
  return tf.one_hot(x, max_id)

In [None]:
# 간단한 다음 글자 예측
# 위에서 학습 안 시키면 이상한 글 나옴
X_new = preprocessing(["How are yo"])
Y_pred = model.predict_classes(X_new)
tokenizer.sequences_to_texts(Y_pred + 1)[0][-1]

In [None]:
# 가짜 셰익스피어 텍스트 만들기
# 초기 텍스트를 입력하고 모델이 가장 가능성 있는 글자 예측
# 이 글자를 텍스트 끝에 추가하고 늘어난 텍스트를 모델에 전달
# 이를 반복 (temperature가 0에 가까울 수록 높은 확률의 글자 택함)
def next_char(text, temperature=1):
  X_new = preprocessing([text])
  y_proba = model.predict(X_new)[0,-1:,:]
  rescaled_logits = tf.math.log(y_proba) / temperature
  char_id = tf.random.categorical(rescaled_logits, num_samples=1) + 1
  return tokenizer.sequences_to_texts(char_id.numpy())[0]

In [None]:
def complete_text(text, n_char=50, temperature=1):
  for _ in range(n_char):
    text += next_char(text, temperature)
  return text

In [None]:
complete_text("t", temperature=0.3)

In [None]:
complete_text("w", temperature=1)

In [None]:
complete_text("e", temperature=2)

In [None]:
# 상태가 있는 RNN (장기 기억 저장)
dataset = tf.data.Dataset.from_tensor_slices(encoded[:train_size])
dataset = dataset.window(window_length, shift=n_steps, drop_remainder=True)
dataset = dataset.flat_map(lambda window : window.batch(window_length))
dataset = dataset.batch(1)
dataset = dataset.map(lambda windows : (windows[:, :-1], windows[:, 1:]))
dataset = dataset.map(lambda x_batch, y_batch: (tf.one_hot(x_batch, depth=max_id), y_batch))
dataset = dataset.prefetch(1)

In [None]:
batch_size = 32
encoded_parts = np.array_split(encoded[:train_size], batch_size)
datasets = []
for encoded_part in encoded_parts:
    dataset = tf.data.Dataset.from_tensor_slices(encoded_part)
    dataset = dataset.window(window_length, shift=n_steps, drop_remainder=True)
    dataset = dataset.flat_map(lambda window: window.batch(window_length))
    datasets.append(dataset)
dataset = tf.data.Dataset.zip(tuple(datasets)).map(lambda *windows: tf.stack(windows))
dataset = dataset.repeat().map(lambda windows: (windows[:, :-1], windows[:, 1:]))
dataset = dataset.map(
    lambda X_batch, Y_batch: (tf.one_hot(X_batch, depth=max_id), Y_batch))
dataset = dataset.prefetch(1)

In [None]:
model = tf.keras.models.Sequential([
              tf.keras.layers.GRU(128, return_sequences=True, stateful=True, dropout=0.2, recurrent_dropout=0.2, 
                                  batch_input_shape=[batch_size, None, max_id]),
              tf. keras.layers.GRU(128, return_sequences=True, stateful=True, dropout=0.2, recurrent_dropout=0.2),
              tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(max_id, activation="softmax"))
])

In [None]:
class ResetStatesCallback(tf.keras.callbacks.Callback):
  def on_epoch_begin(self, epoch, logs):
    self.model.reset_states()

In [None]:
"""
model.compile(loss="sparse_categorical_crossentropy", optimizer="adam")
steps_per_epoch = train_size // batch_size // n_steps
model.fit(dataset, steps_per_epoch=steps_per_epoch, epochs=50, callbacks=[ResetStatesCallback()])
"""
# 오래 걸려서 학습 중단