In [1]:
# Python ≥3.5 is required
import sys
assert sys.version_info >= (3, 5)

# Is this notebook running on Colab or Kaggle?
IS_COLAB = "google.colab" in sys.modules
IS_KAGGLE = "kaggle_secrets" in sys.modules

# Scikit-Learn ≥0.20 is required
import sklearn
assert sklearn.__version__ >= "0.20"

# TensorFlow ≥2.0 is required
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.0"

if not tf.config.list_physical_devices('GPU'):
    print("No GPU was detected. LSTMs and CNNs can be very slow without a GPU.")
    if IS_COLAB:
        print("Go to Runtime > Change runtime and select a GPU hardware accelerator.")
    if IS_KAGGLE:
        print("Go to Settings > Accelerator and select GPU.")

# Common imports
import numpy as np
import os
from pathlib import Path

# to make this notebook's output stable across runs
np.random.seed(42)
tf.random.set_seed(42)

# To plot pretty figures
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)

# Where to save the figures
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "rnn"
IMAGES_PATH = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID)
os.makedirs(IMAGES_PATH, exist_ok=True)

def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300):
    path = os.path.join(IMAGES_PATH, fig_id + "." + fig_extension)
    print("Saving figure", fig_id)
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format=fig_extension, dpi=resolution)

No GPU was detected. LSTMs and CNNs can be very slow without a GPU.


# 시계열 예측하기
- 타임스템마다 하나의 값을 가지면 **단변량 시계열**, 여러개의 값을 가지면 **다변량 시계열**이다.
#### 

In [2]:
def generate_time_series(batch_size, n_steps):
    freq1, freq2, offsets1, offsets2 = np.random.rand(4, batch_size, 1)
    time = np.linspace(0, 1, n_steps)
    series = 0.5 * np.sin((time - offsets1)*(freq1*10 + 10))
    series += 0.2 * np.sin((time - offsets2)*(freq1*20 + 20))
    series = 0.2 * (np.random.rand(batch_size, n_steps)-0.5)
    return series[..., np.newaxis].astype(np.float32)

# 이 함수는 (batch_size 맥변수로) 요청한 만큼 n_stemps 길이의 여러 시계열을 만듬

In [3]:
# 위 함수를 사용해 훈련 세트, 검증 세트, 테스트 세트를 만듬

n_steps = 50
series = generate_time_series(10000, n_steps + 1)
X_train, y_train = series[:7000, :n_steps], series[:7000, -1]
X_valid, y_valid = series[7000:9000, :n_steps], series[7000:9000, -1]
X_test, y_test = series[9000:, :n_steps], series[9000:, -1]


- RNN을 시작하기전에 기준 선능르 몇개 준비하는 것이 좋음, 그렇지 않으면 실제 기본 모델보다 성능이 나쁠 때도 잘 작동한다고 생각할 수 있음
- 예로 각 시계열이 마지막 값을 그대로 예측하는 것입니다. 이를 **순진한 예측**이라고 부른다.

### 방법 1

In [4]:
y_pred = X_valid[:, -1]
np.mean(keras.losses.mean_squared_error(y_valid, y_pred)) # 평균 제곱 오차 


0.006572011

### 방법 2
### 완전 연결 네트워크를 사용하는 것!
- 이 네트워크는 입력마다 1차원 특성 배열을 기대하기 때문에 Flatten 층을 추가해야 함. 시계열 값의 선형 조합으로 예측하기 위해 간단한 선형 회귀 모델을 사용하겠음

In [5]:
model = keras.models.Sequential([keras.layers.Flatten(input_shape=[50, 1]),
                                keras.layers.Dense(1)])
model.compile(loss="mse", optimizer="adam")
history = model.fit(X_train, y_train, epochs=20,
                    validation_data=(X_valid, y_valid))
model.evaluate(X_valid, y_valid)


# 순진한 예측보다 훨씬 나음, 0.00342

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


0.0034201755188405514

### 간단한 RNN 구현하기

In [6]:
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.SimpleRNN(1, input_shape=[None, 1])
])

In [7]:
optimizer = keras.optimizers.Adam(learning_rate=0.005)
model.compile(loss="mse", optimizer=optimizer)
history = model.fit(X_train, y_train, epochs=20,
                    validation_data=(X_valid, y_valid))


Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


- 이 성능은 순진한 예측보다는 낫지만 간단한 선형 모델을 앞지르지 못함 
- 기본 RNN의 순환 뉴런은 입력과 은닉 상태 차원(기본 RNN에서는 층의 순환 뉴럴ㄴ 개수와 같음) 마다 하나의 파라미터를 가지고 편향이 있음
- 기본 RNN에서는 3개의 파라미터가 있음

### 심층 RNN
- 셀을 여러 층으로 쌓는 것이 일반적

In [8]:
np.random.seed(42)
tf.random.set_seed(42)


model = keras.Sequential([
    keras.layers.SimpleRNN(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.SimpleRNN(20, return_sequences=True),
    keras.layers.SimpleRNN(1)
])

model.compile(loss="mse", optimizer="adam")
history = model.fit(X_train, y_train, epochs=20,
                    validation_data=(X_valid, y_valid))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [9]:
# 모델 평가

model.evaluate(X_valid, y_valid)



0.0034748041070997715

- 이 RNN은 한 타임 스텝에서 다음 타임 스템으로 필요한 모든 정보를 나르기 위해 다른 순환 층의 은닉 상태를 주로 사용
- 마지막 층의 은닉 상태는 크게 필요하지 않음
- SimpleRNN 층은 기본적으로 tanh 활성화 함수를 사용하기 때문에 예측된 값이 -1과 1 사이 범위에 놓입니다. 다른 활성화 함수를 사용하면??
- 이런 이유로 출력층은  Dense층으로 바꾸는 경우가 많음. **더 바르면서 정황도는 거의 비슷함**
- 이렇게 바꾸렴녀 두번재 순환 층에서 return_sequences = True를 제거해야 함

In [10]:
np.random.seed(42)
tf.random.set_seed(42)

# 이 모델을 훈련하면 바르게 수렴하고 성능도 좋음
# 또한 출력층으이 활성화 함수를 원하는 함수로 바꿀 수 있음

model = keras.Sequential([
    keras.layers.SimpleRNN(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.SimpleRNN(20, return_sequences=True),
    keras.layers.Dense(1)
])

model.compile(loss="mse", optimizer="adam")
history = model.fit(X_train, y_train, epochs=20,
                    validation_data=(X_valid, y_valid))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [11]:
# 모델 평가

model.evaluate(X_valid, y_valid)



0.0034103619400411844

---
### 여러 타임 스텝을 예측하기
- 이전까지는 다음 스텝의 ㄱ밧만 예그했지만 타깃을 적절히 바꾸어 여러 타임 스텝 앞의 값을 손쉽게 예측할 수 있음


#### 첫번째 방법
- 이미 훈련된 모델을 사용하여 다음 값을 예측한 다음 이 값을 입력으로 추가하는 것!

In [12]:
np.random.seed(43)

series = generate_time_series(1, n_steps + 10)
X_new, Y_new = series[:, :n_steps], series[:, n_steps:]
X = X_new

for step_ahead in range(10):
    y_pred_one = model.predict(X[:, step_ahead:])[:, np.newaxis, :]
    X = np.concatenate([X, y_pred_one], axis=1)

Y_pred = X[:, n_steps:]

ValueError: all the input arrays must have same number of dimensions, but the array at index 0 has 3 dimension(s) and the array at index 1 has 4 dimension(s)

In [None]:
Y_pred.shape

In [None]:
def plot_multiple_forecasts(X, Y, Y_pred):
    n_steps = X.shape[1]
    ahead = Y.shape[1]
    plot_series(X[0, :, 0])
    plt.plot(np.arange(n_steps, n_steps + ahead), Y[0, :, 0], "bo-", label="Actual")
    plt.plot(np.arange(n_steps, n_steps + ahead), Y_pred[0, :, 0], "rx-", label="Forecast", markersize=10)
    plt.axis([0, n_steps + ahead, -1, 1])
    plt.legend(fontsize=14)

plot_multiple_forecasts(X_new, Y_new, Y_pred)
save_fig("forecast_ahead_plot")
plt.show()

### 두번째 방법
- RNN을 훈련하여 다음 값 10개를 한 번에 예측하는 것
- 시퀀스-투-벡터 모델을 사용하지만 1개가 아니라 값 10개를 출력

In [21]:
# 먼저 타깃을 다음 10개의 값이 담긴 벡터로 바꾸어야 한다.

np.random.seed(42)

n_steps = 50
series = generate_time_series(10000, n_steps + 10)
X_train, Y_train = series[:7000, :n_steps], series[:7000, -10:, 0]
X_valid, Y_valid = series[7000:9000, :n_steps], series[7000:9000, -10:, 0]
X_test, Y_test = series[9000:, :n_steps], series[9000:, -10:, 0]

In [22]:
# 이제 1개의 유닛이 아니라 10개의 유닛을 가진 출력층이 필요

np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.SimpleRNN(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.SimpleRNN(20),
    keras.layers.Dense(10)
])

model.compile(loss="mse", optimizer="adam")
history = model.fit(X_train, Y_train, epochs=20,
                    validation_data=(X_valid, Y_valid))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [28]:
np.random.seed(43)

series = generate_time_series(1, 50 + 10)
X_new, Y_new = series[:, :50, :], series[:, -10:, :]
Y_pred = model.predict(X_new)[..., np.newaxis]
Y_pred

array([[[-0.00258692],
        [ 0.0030446 ],
        [-0.00097259],
        [ 0.000591  ],
        [-0.00680806],
        [-0.00198271],
        [-0.00729517],
        [-0.00539511],
        [ 0.00112137],
        [ 0.00153604]]], dtype=float32)

---
- 마지막 스텝에서만 다음 값 10개를 예측하도록 모델을 훈련하는 대신 모든 타임 스텝에서 다음 값 10개를 예측하도록 모델 훈련
- 시퀀스-투-벡터RNN에서 시퀀스-투 시퀀스RNN으로 바꿈
- 훈련을 안정적으로 만들고 훈련 속도를 높임

In [33]:
# 타깃 시퀀스 준비
np.random.seed(42)

n_steps = 50
series = generate_time_series(10000, n_steps + 10)
X_train = series[:7000, :n_steps]
X_valid = series[7000:9000, :n_steps]
X_test = series[9000:, :n_steps]
Y = np.empty((10000, n_steps, 10))
for step_ahead in range(1, 10 + 1):
    Y[..., step_ahead - 1] = series[..., step_ahead:step_ahead + n_steps, 0]
Y_train = Y[:7000]
Y_valid = Y[7000:9000]
Y_test = Y[9000:]

- 이 모델을 시퀀스-투- 시퀀스 모델로 바꾸려면 모든 순환 층(마지막 층도)에 return_sequences=True로 지정해야 함
- 그 다음 모든 타임 스텝에서 출력을 Dense 층에 적용해야 한다. 케라스는 이런 목적으로  TimeDistributed 층을 제공

In [34]:
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.SimpleRNN(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.SimpleRNN(20, return_sequences=True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

# 훈련을 위해 모든 출력에 걸쳐 MSE를 계산
def last_time_step_mse(Y_true, Y_pred):
    return keras.metrics.mean_squared_error(Y_true[:, -1], Y_pred[:, -1])

model.compile(loss="mse", optimizer=keras.optimizers.Adam(learning_rate=0.01), metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train, epochs=20,
                    validation_data=(X_valid, Y_valid))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [42]:
np.random.seed(43)

series = generate_time_series(1, 50 + 10)
X_new, Y_new = series[:, :50, :], series[:, 50:, :]
Y_pred = model.predict(X_new)[:, -1][..., np.newaxis]

### Deep RNN with Batch Norm

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.SimpleRNN(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.BatchNormalization(),
    keras.layers.SimpleRNN(20, return_sequences=True),
    keras.layers.BatchNormalization(),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

model.compile(loss="mse", optimizer="adam", metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train, epochs=20,
                    validation_data=(X_valid, Y_valid))

## 사용자 정의 메모리 셀(층 정규화)
####  불안정한 그레디언트 문제 해결

In [48]:
from tensorflow.keras.layers import LayerNormalization


class LNSimpleRNNCell(keras.layers.Layer): # keras.layers.Layer 클라스 상속
    
    def __init__(self, units, activation="tanh", **kwargs):
        super().__init__(**kwargs)
        self.state_size = units
        self.output_size = units
        self.simple_rnn_cell = keras.layers.SimpleRNNCell(units,
                                                          activation=None)
        self.layer_norm = LayerNormalization()
        self.activation = keras.activations.get(activation)
    def get_initial_state(self, inputs=None, batch_size=None, dtype=None):
        if inputs is not None:
            batch_size = tf.shape(inputs)[0]
            dtype = inputs.dtype
        return [tf.zeros([batch_size, self.state_size], dtype=dtype)]
    def call(self, inputs, states):
        outputs, new_states = self.simple_rnn_cell(inputs, states)
        norm_outputs = self.activation(self.layer_norm(outputs))
        return norm_outputs, [norm_outputs]

In [49]:
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.RNN(LNSimpleRNNCell(20), return_sequences=True,
                     input_shape=[None, 1]),
    keras.layers.RNN(LNSimpleRNNCell(20), return_sequences=True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

model.compile(loss="mse", optimizer="adam", metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train, epochs=20,
                    validation_data=(X_valid, Y_valid))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


#### 단기 기억 문제 해결하기
- RNN을 거치면서 데이터가 변환되무로 일부 정보는 매 훈련 스텝 후 사라짐.  어느 정도 시간이 지나면 RNN 의 상태는 사실상 첫 번째 입력의 흔적을 가지고 있지 않음. 이는 심각한 문제가 될 수 있음
- 장기 메모리를 가진 셀에서 가장 있기 있는 **LSTM셀**을 먼저 사용
- 케라스에서는  간단하게 SimpleRNN층 대신 LSTM 층을 사용하면 된다.

In [50]:
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.LSTM(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.LSTM(20, return_sequences=True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

model.compile(loss="mse", optimizer="adam", metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train, epochs=20,
                    validation_data=(X_valid, Y_valid))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [51]:
model.evaluate(X_valid, Y_valid)



[0.0033316826447844505, 0.0033199177123606205]

### 범용 목적이 keras.layers.RNN층에 LSTMCell을 매개변수로 지정할 수 있음

In [None]:
model = keras.models.Sequential([
    keras.layers.RNN(keras.layers.LSTMCell(20), return_sequences=True, input_shape=[None, 1]),
    keras.layers.RNN(keras.layers.LSTMCell(20), return_sequences=True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

### 하지만 LSTM층이 GPU에서 실행할 때 최적화된 구현을 사용하므로 일반적으로 선호
- LSTM 셀은 중요한 입력을 인식하고(입력 게이트의 역활), 장기 상태에 저장하고, 필요한 기간동안 이를 보존하고(삭제 게이트의 역활), 필요할 때마다 이를 추출하기 위해 학습
- 그러므로 LSTM 셀은 시계열, 긴 텍스트, 오디오 녹음 등에서 장기 패턴을 잡아내는 데 놀라운 성과를 냄

# GRU 셀
- GRU(gated recurrent unit) 셀은 LSTM셀의 간소화된 버전이고 유사하게 작동하는 것처럼 보임(인기 많음)
- 

In [53]:
# 1D 합성곱 층을 사용해 시퀀스 처리하기

np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.GRU(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.GRU(20, return_sequences=True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

model.compile(loss="mse", optimizer="adam", metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train, epochs=20,
                    validation_data=(X_valid, Y_valid))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [54]:
# 모델 평가

model.evaluate(X_valid, Y_valid)



[0.003331604413688183, 0.0033200255129486322]

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.Conv1D(filters=20, kernel_size=4, strides=2, padding="valid",
                        input_shape=[None, 1]),
    keras.layers.GRU(20, return_sequences=True),
    keras.layers.GRU(20, return_sequences=True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

model.compile(loss="mse", optimizer="adam", metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train[:, 3::2], epochs=20,
                    validation_data=(X_valid, Y_valid[:, 3::2]))

## WaveNet

- 첫 번째 합성곱 층이 한 번에 2개의 타임 스텝만 바라 봄, 다음 스텝은 4개, 8개의 타입 스텝을 보는 식
- 팽창 비율을 두 배로 늘린 덕분에 네트워크는 아주 긴 시퀀스를 매우 효율적으로 처리할 수 있음

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential()
model.add(keras.layers.InputLayer(input_shape=[None, 1]))
for rate in (1, 2, 4, 8) * 2:
    model.add(keras.layers.Conv1D(filters=20, kernel_size=2, padding="causal",
                                  activation="relu", dilation_rate=rate))
model.add(keras.layers.Conv1D(filters=10, kernel_size=1))
model.compile(loss="mse", optimizer="adam", metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train, epochs=20,
                    validation_data=(X_valid, Y_valid))