In [None]:
import sys
import sklearn
import tensorflow as tf
from tensorflow import keras
import numpy as np
import os
from pathlib import Path

np.random.seed(42)
tf.random.set_seed(42)

%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)
plt.style.use('seaborn-darkgrid')

### define T.S sample generator

In [None]:
# (관측수 x step수 x 1) 형상의 시계열 생성
def generate_time_series(batch_size, n_steps):
    freq1, freq2, offsets1, offsets2 = np.random.rand(4, batch_size, 1)
    time = np.linspace(0, 1, n_steps)
    series = 0.5 * np.sin((time - offsets1) * (freq1 * 10 + 10))  #   wave 1
    series += 0.2 * np.sin((time - offsets2) * (freq2 * 20 + 20)) # + wave 2
    series += 0.1 * (np.random.rand(batch_size, n_steps) - 0.5)   # + noise
    return series[..., np.newaxis].astype(np.float32)

In [None]:
np.random.seed(42)

n_steps = 50
# 주어진 50 step으로 향후 10 step 예측하는 task
series = generate_time_series(10000, n_steps + 10)
X_train = series[:7000, :n_steps]
X_valid = series[7000:9000, :n_steps]
X_test = series[9000:, :n_steps]
# 예측값을 새로운 input값으로 활용한다.
Y = np.empty((10000, n_steps, 10))
for step_ahead in range(1, 10 + 1):
    Y[..., step_ahead - 1] = series[..., step_ahead:step_ahead + n_steps, 0]
Y_train = Y[:7000]
Y_valid = Y[7000:9000]
Y_test = Y[9000:]

# RNN with Batch Normalization

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

# time step 간에 BN을 적용하는 것은 기술적으로 어려움.
# 대신 수직방향 층 사이사이 추가할 수 있음
model = keras.models.Sequential([
    keras.layers.SimpleRNN(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.BatchNormalization(),
    keras.layers.SimpleRNN(20, return_sequences=True),
    keras.layers.BatchNormalization(),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

model.compile(loss="mse", optimizer=keras.optimizers.Adam(lr=0.01), 
              metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train, epochs=10,
                    validation_data=(X_valid, Y_valid))

# RNN cell with Layer Normalization

In [None]:
# 메모리셀 내부에 층 정규화 구현
class LNSimpleRNNCell(keras.layers.Layer):
    def __init__(self, units, activation = "tanh", **kwargs):
        super().__init__(**kwargs)
        # 기본 rnn 셀로 구현 -> hidden state 벡터 길이와 output 벡터 길이가 유닛 개수와 동일
        self.state_size = units
        self.output_size = units
        # 활성화 사용하지 않는 기본 선형조합 수행 셀
        self.simple_rnn_cell = keras.layers.SimpleRNNCell(units, activation = None)
        # 이후 LN 적용
        self.layer_norm = keras.layers.LayerNormalization()
        # 이후 tanh 등의 활성화
        self.activation = keras.activations.get(activation)
    
    # 초기 상태값을 설정하기 위한 메서드
    def get_initial_state(self, inputs=None, batch_size=None, dtype=None):
        if inputs is not None:
            batch_size = tf.shape(inputs)[0]
            dtype = inputs.dtype
        return [tf.zeros([batch_size, self.state_size], dtype=dtype)]
    
    def call(self, inputs, states):
        # 기본 RNN셀에서 state와 output은 동일. 
        # simple rnn cell을 거쳐 (미니배치 크기 * 유닛 수) 크기의 메모리 행렬(벡터)를 얻는다.
        outputs, new_states = self.simple_rnn_cell(inputs, states)
        # 이 메모리 행렬(벡터)을 열방향(feature)으로 normalize한다.
        norm_outputs = self.activation(self.layer_norm(outputs))
        return norm_outputs, [norm_outputs]

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.RNN(LNSimpleRNNCell(20), return_sequences=True,
                     input_shape=[None, 1]),
    keras.layers.RNN(LNSimpleRNNCell(20), return_sequences=True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

def last_time_step_mse(Y_true, Y_pred):
    return keras.metrics.mean_squared_error(Y_true[:, -1], Y_pred[:, -1])

# learning rate가 성능에 큰 영향 미침.
model.compile(loss="mse", optimizer=keras.optimizers.Adam(lr=0.01),
              metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train, epochs=10,
                    validation_data=(X_valid, Y_valid))

model.evaluate(X_test, Y_test)

# LSTM

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.LSTM(20, return_sequences = True, input_shape = [None, 1]),
    keras.layers.LSTM(20, return_sequences = True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

model.compile(loss="mse", optimizer=keras.optimizers.Adam(lr=0.01),
              metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train, epochs=10,
                    validation_data=(X_valid, Y_valid))

model.evaluate(X_test, Y_test)

# 사용자 정의 셀을 사용하는 등 범용 목적의 RNN 층에 LSTMCell을 포함시켜 만들수도 있음
# 그러나 일반적으로 위의 LSTM층이 GPU에서 실행시 최적화된 구현을 사용하므로 더 선호됨.
# model = keras.models.Sequential([
#     keras.layers.RNN(keras.layers.LSTMCell(20), return_sequences = True, 
#                      input_shape = [None, 1]),
#     keras.layers.RNN(keras.layers.LSTMCell(20), return_sequences = True)
#     keras.layers.TimeDistributed(keras.layers.Dense(10))
# ])

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

# Peephole connection
# LSTM 보다 계산량이 많아 약간 더 느리지만, 추론 성능을 향상시키는 경우가 많음
model = keras.models.Sequential([
    keras.layers.RNN(keras.experimental.PeepholeLSTMCell(20),
                     return_sequences = True, 
                     input_shape = [None, 1]),
    keras.layers.RNN(keras.experimental.PeepholeLSTMCell(20), 
                     return_sequences = True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

model.compile(loss="mse", optimizer=keras.optimizers.Adam(lr=0.01),
              metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train, epochs=10,
                    validation_data=(X_valid, Y_valid))

model.evaluate(X_test, Y_test)

# GRU

In [None]:
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.GRU(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.GRU(20, return_sequences=True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

model.compile(loss="mse", optimizer="adam", metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train, epochs=10,
                    validation_data=(X_valid, Y_valid))

# 1D Convolution for sequence compressing

In [None]:
# best performance
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential([
    keras.layers.Conv1D(filters=20, kernel_size=4, strides=2, padding="valid",
                        input_shape=[None, 1]),
    keras.layers.GRU(20, return_sequences=True),
    keras.layers.GRU(20, return_sequences=True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

model.compile(loss="mse", optimizer="adam", metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train[:, 3::2], epochs=20,
                    validation_data=(X_valid, Y_valid[:, 3::2]))

In [None]:
# toy WaveNet
np.random.seed(42)
tf.random.set_seed(42)

model = keras.models.Sequential()
model.add(keras.layers.InputLayer(input_shape=[None, 1]))
for rate in (1, 2, 4, 8) * 2:
    # use dilated convolution
    model.add(keras.layers.Conv1D(filters=20, kernel_size=2, padding="causal",
                                  activation="relu", dilation_rate=rate))
# output layer (1*1 convolution)
model.add(keras.layers.Conv1D(filters=10, kernel_size=1))
model.compile(loss="mse", optimizer="adam", metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train, epochs=20,
                    validation_data=(X_valid, Y_valid))