### 시계열 예측하기

In [12]:
import numpy as np
from tensorflow import keras
import tensorflow as tf

  from pandas.core import (


In [4]:
def generate_time_series(batch_size, n_steps) : # 단변량
    freq1, freq2, offsets1, offsets2 = np.random.rand(4, batch_size, 1)
    time = np.linspace(0, 1, n_steps)
    series = 0.5 * np.sin((time - offsets1) * (freq1 * 10 + 10)) # 사인 곡선1
    series += 0.2 * np.sin((time - offsets2) * (freq2 * 20 + 20)) # +사인 곡선2
    series += 0.1 * (np.random.rand(batch_size, n_steps) - 0.5) # +잡읍
    return series[..., np.newaxis].astype(np.float32) # (배치크기, 타임스텝수, 1) 크기 반환

In [81]:
n_steps = 50
series = generate_time_series(10000, n_steps + 1)
X_train, y_train = series[:7000, :n_steps], series[:7000, -1]
X_valid, y_valid = series[7000:9000, :n_steps], series[7000:9000, -1]
X_test, y_test = series[9000:, :n_steps], series[9000:, -1]

In [82]:
X_train.shape, y_train.shape

((7000, 50, 1), (7000, 1))

In [23]:
# 기준 성능 - 순진한 예측 : 각 시계열의 마지막 값 그대로 예측
y_pred = X_valid[:, -1] # (2000, 1)
np.mean(keras.losses.mean_squared_error(y_valid, y_pred))

0.020684227

In [14]:
# 완전 연결 네트워크 이용 - 선형 예측
model = keras.models.Sequential([
    keras.layers.Flatten(input_shape = [50, 1]),
    keras.layers.Dense(1)
])

model.compile(loss="mse", optimizer="adam")
history = model.fit(X_train, y_train, epochs=20,
                    validation_data=(X_valid, y_valid))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [16]:
model.evaluate(X_valid, y_valid)



0.003547875676304102

In [17]:
# 간단한 RNN 구현하기
model = keras.models.Sequential([
    keras.layers.SimpleRNN(1, input_shape=[None, 1])
])
# RNN은 어떤 길이의 타임 스텝도 처리할 수 있기에, 입력 시퀀스의 길이 지정할 필요 없다.

optimizer = keras.optimizers.Adam(learning_rate=0.005)
model.compile(loss="mse", optimizer=optimizer)
history = model.fit(X_train, y_train, epochs=20,
                    validation_data=(X_valid, y_valid))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [18]:
model.evaluate(X_valid, y_valid)



0.011437698267400265

In [19]:
# 심층 RNN
model = keras.models.Sequential([
    keras.layers.SimpleRNN(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.SimpleRNN(20, return_sequences=True),
    keras.layers.SimpleRNN(1)
])

model.compile(loss="mse", optimizer="adam")
history = model.fit(X_train, y_train, epochs=20,
                    validation_data=(X_valid, y_valid))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [20]:
model.evaluate(X_valid, y_valid) 



0.0037151025608181953

In [84]:
# 기본적으로 tanh 활성화 함수 사용, 다른 활성화 함수 사용
model = keras.models.Sequential([
    keras.layers.SimpleRNN(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.SimpleRNN(20),
    keras.layers.Dense(1)
])

model.compile(loss="mse", optimizer="adam")
history = model.fit(X_train, y_train, epochs=20,
                    validation_data=(X_valid, y_valid))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [85]:
model.evaluate(X_valid, y_valid)



0.0027346147689968348

MSE로 보는  
성능 : 순진한 예측 < 간단한 RNN < 선형 모델 < 심층 RNN < 출력층 Dense

#### 여러 타임 스텝 앞을 예측하기

In [86]:
# 1. 이미 훈련된 모델 사용하여 다음 값 예측한 다음, 이 값을 입력으로 추가
# 다음 스텝에 대한 예측은 보통 더 미래의 타임스텝에 대한 예측보다 정확함
# 한 번에 하나의 미래 스텝을 예측하기 위해 RNN 사용하는 것보다 나음
series = generate_time_series(1, n_steps + 10)
X_new, Y_new = series[:, :n_steps], series[:, n_steps:]
X = X_new

for step_ahead in range(10) :
    y_pred_one = model.predict(X[:, step_ahead:])[:, np.newaxis, :]
    X = np.concatenate([X, y_pred_one], axis=1)



In [87]:
Y_pred = X[:, n_steps:]
Y_pred.shape

(1, 10, 1)

In [88]:
np.mean(keras.losses.mean_squared_error(Y_pred, Y_new))

0.0029259264

In [92]:
# 2. RNN 훈련해 다음 값 10개 한번에 예측
# 시퀀스-투-벡터 모델 사용하지만, 1개가 아닌 10개 값 출력
series = generate_time_series(10000, n_steps+10)
X_train, Y_train = series[:7000, :n_steps], series[:7000, -10:, 0]
X_valid, Y_valid = series[7000:9000, :n_steps], series[7000:9000, -10:, 0]
X_test, Y_test = series[9000:, :n_steps], series[9000:, -10:, 0]

In [93]:
X = X_valid
for step_ahead in range(10) :
    y_pred_one = model.predict(X)[:, np.newaxis, :]
    X = np.concatenate([X, y_pred_one], axis=1)
    
Y_pred = X[:, n_steps:, 0]
Y_pred.shape



(2000, 10)

In [94]:
model = keras.models.Sequential([
    keras.layers.SimpleRNN(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.SimpleRNN(20),
    keras.layers.Dense(10)
])

model.compile(loss='mse', optimizer='adam')
model.fit(X_train, Y_train, epochs=20, validation_data=(X_valid, y_valid))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


<keras.src.callbacks.History at 0x197e9e00450>

In [96]:
model.evaluate(X_valid, Y_valid)



0.008630957454442978

In [97]:
y_pred = model.predict(X_test)
np.mean(keras.losses.mean_squared_error(Y_test,y_pred)) # 좋은 성능



0.008936382

In [98]:
# 타임 스텝마다 모델은 이전 타임 스템만 볼 수 있고, 앞을 볼 수 없다. -> 인과 모델
# 타깃 시퀀스
n_steps = 50
Y =  np.empty((10000, n_steps, 10))
for step_ahead in range(1, 10+1) :
    Y[:, :, step_ahead-1] = series[:, step_ahead:step_ahead + n_steps, 0]
Y_train = Y[:7000]
Y_valid = Y[7000:9000]
Y_test = Y[9000:]

In [99]:
X_train.shape, Y_train.shape

((7000, 50, 1), (7000, 50, 10))

In [100]:
# 시퀀스-투-시퀀스 모델로 변환
# 모든 타임 스텝에서 출력을 Dense층에 적용
model = keras.models.Sequential([
    keras.layers.SimpleRNN(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.SimpleRNN(20, return_sequences=True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

def last_time_step_mse(Y_true, Y_pred):
    return keras.metrics.mean_squared_error(Y_true[:, -1], Y_pred[:, -1]) # 마지막 타임 스텝의 MSE만 계산

model.compile(loss="mse", optimizer=keras.optimizers.Adam(learning_rate=0.01), metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train, epochs=20,
                    validation_data=(X_valid, Y_valid))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [102]:
model.evaluate(X_valid, Y_valid)



[0.017983851954340935, 0.00610963674262166]

In [101]:
y_pred = model.predict(X_test)
np.mean(keras.losses.mean_squared_error(Y_test,y_pred))



0.018196573

In [63]:
series = generate_time_series(1, 50 + 10)
X_new, Y_new = series[:, :50, :], series[:, 50:, :]
Y_pred = model.predict(X_new)[:, -1][..., np.newaxis]



In [65]:
np.mean(keras.losses.mean_squared_error(Y_new,Y_pred))

0.0072089895

In [104]:
# 배치 정규화 사용한 심층 RNN
model = keras.models.Sequential([
    keras.layers.SimpleRNN(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.BatchNormalization(),
    keras.layers.SimpleRNN(20, return_sequences=True),
    keras.layers.BatchNormalization(),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

model.compile(loss="mse", optimizer="adam", metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train, epochs=20,
                    validation_data=(X_valid, Y_valid))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [106]:
model.evaluate(X_valid, Y_valid)



[0.031494203954935074, 0.02101447433233261]

In [107]:
y_pred = model.predict(X_test)
np.mean(keras.losses.mean_squared_error(Y_test,y_pred))



0.03146015

### 긴 시퀀스 다루기
* 배치 정규화가 DNN에서는 효과적이지만, RNN에서는 효율적이지 않다.
* 타임 스텝 사이에 사용할 수 없고, 순환층 사이에서만 이용가능하다는 문제
* 층 정규화 방식 채택 : 특성 차원에 대하여 정규화
    * 입력과 은닉 상태의 선형 조합 직후에 사용, 스케일과 이동 파라미터가 하나씩 학습

In [70]:
from tensorflow.keras.layers import LayerNormalization

In [71]:
class LNSimpleRNNCell(keras.layers.Layer) :
    def __init__(self, units, activation='tanh', **kwargs) :
        super().__init__(**kwargs)
        self.state_size = units # 이전 타임 스텝 은닉
        self.output_size = units
        # 활성화함수 없이 SimpleRNNCell -> 선형 연산 후와 활성화 함수 전 층 정규화 수행하기 위해
        self.simple_rnn_cell = keras.layers.SimpleRNNCell(units, activation=None)
        self.layer_norm = LayerNormalization()
        self.activation = keras.activations.get(activation)
        
    def get_initial_state(self, inputs = None, batch_size=None, dtype=None) :
        if inputs is not None :
            batch_size = tf.shape(inputs)[0]
            dtype = inputs.dtype
        return [tf.zeros([batch_size, self.state_size], dtype=dtype)]
    
    def call(self, inputs, states) :
        outputs, new_states = self.simple_rnn_cell(inputs, states) # SimpleRNNCell에서 출력은 은닉상태와 동일(new_state[0]=outputs)
        norm_outputs = self.activation(self.layer_norm(outputs))
        return norm_outputs, [norm_outputs] # 출력, 새로운 은닉 상태

In [72]:
model = keras.models.Sequential([
    keras.layers.RNN(LNSimpleRNNCell(20), return_sequences=True, input_shape=[None,1]),
    keras.layers.RNN(LNSimpleRNNCell(20), return_sequences=True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

In [108]:
model.compile(loss='mse', optimizer='adam', metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train, epochs=20,
                    validation_data=(X_valid, Y_valid))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [109]:
model.evaluate(X_valid, Y_valid)



[0.02497214823961258, 0.012774335220456123]

### LSTM

In [110]:
model = keras.models.Sequential([
    keras.layers.LSTM(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.LSTM(20, return_sequences=True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

model.compile(loss="mse", optimizer="adam", metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train, epochs=20,
                    validation_data=(X_valid, Y_valid))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [111]:
model.evaluate(X_valid, Y_valid)



[0.024193109944462776, 0.008899789303541183]

### GRU

In [112]:
model = keras.models.Sequential([
    keras.layers.GRU(20, return_sequences=True, input_shape=[None, 1]),
    keras.layers.GRU(20, return_sequences=True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

In [113]:
model.compile(loss="mse", optimizer="adam", metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train, epochs=20,
                    validation_data=(X_valid, Y_valid))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [114]:
model.evaluate(X_valid, Y_valid)



[0.02308931015431881, 0.009285940788686275]

#### 1D 합성곱 층 사용해 시퀀스 처리하기
* LSTM, GRU : RNN보다 훨씬 긴 시퀀스 다룰 수 있지만, 매우 제한적인 단기 기억 가짐
* 장기 패턴 학습하는 데 어려움
* 해결 방법 : 1D 합성곱 층 사용해 입력 시퀀스 짧게 줄임
-----------------------
* 1 stride + same 패딩으로 1D 합성곱 층 사용하면, 출력 시퀀스 길이는 입력 시퀀스와 같다.
* 1보다 큰 stride + valid 패딩 : 출력 시퀀스 < 입력 시퀀스

In [115]:
# 합성곱 층으로 시퀀스 길이 줄이면 GRU 층이 더 긴 패턴 감지하는 데 도움
# 타깃에서 처음 세 개의 타임 스텝 버리고(커널 크기가 4이므로 합성 곱 층의 첫 출력은 0~3까지 타임 스텝으로 만들어짐)
# 두 배로 다운샘플 해야 함
model = keras.models.Sequential([
    keras.layers.Conv1D(filters=20, kernel_size=4, strides=2, padding="valid",
                        input_shape=[None, 1]),
    keras.layers.GRU(20, return_sequences=True),
    keras.layers.GRU(20, return_sequences=True),
    keras.layers.TimeDistributed(keras.layers.Dense(10))
])

model.compile(loss="mse", optimizer="adam", metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train[:, 3::2], epochs=20,
                    validation_data=(X_valid, Y_valid[:, 3::2]))
# 첫 세 개 타입스텝 버리고, 커널 4, 스트라이드 2 이므로 [0,1,2,3] -> [2, 3, 4, 5]

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [118]:
model.evaluate(X_valid, Y_valid[:, 3::2]) # 가장 좋은 성능



[0.01838657073676586, 0.008430030196905136]

#### WAVENET

In [119]:
model = keras.models.Sequential()
model.add(keras.layers.InputLayer(input_shape=[None, 1]))
for rate in (1, 2, 4, 8) * 2:
    model.add(keras.layers.Conv1D(filters=20, kernel_size=2, padding="causal",
                                  activation="relu", dilation_rate=rate))
    # 미래 시퀀스 훔쳐보지 않음 (= 입력의 왼쪽에 0 패딩 + valid 패딩)
model.add(keras.layers.Conv1D(filters=10, kernel_size=1))
model.compile(loss="mse", optimizer="adam", metrics=[last_time_step_mse])
history = model.fit(X_train, Y_train, epochs=20,
                    validation_data=(X_valid, Y_valid))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [120]:
model.evaluate(X_valid, Y_valid) # 좋은 성능



[0.020382121205329895, 0.009263787418603897]