In [1]:
import os
import random
import numpy as np
import tensorflow as tf

def set_random_seed(seed=42):
    """
    랜덤 시드 고정
    :param seed: 고정할 랜덤 시드 값 (기본값: 42)
    """
    os.environ['PYTHONHASHSEED'] = str(seed)  # Python 랜덤 시드
    random.seed(seed)                         # random 모듈 랜덤 시드
    np.random.seed(seed)                      # NumPy 랜덤 시드
    tf.random.set_seed(seed)                  # TensorFlow 랜덤 시드

In [2]:
import pandas as pd
import numpy as np
from tensorflow.keras.layers import Input, Dense, Layer, Embedding, Add, LayerNormalization
from tensorflow.keras.models import Model
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split

# CSV 파일 로드
file_path = "./ETTh1.csv"
df = pd.read_csv(file_path)

# 날짜를 datetime으로 변환 후 인덱스 설정
df["date"] = pd.to_datetime(df["date"])
df = df.set_index("date")

# 입력에 사용할 Feature와 출력에 사용할 Target 분리
input_features = ['HUFL', 'HULL', 'MUFL', 'MULL', 'LUFL', 'LULL']  # 입력 특성 (6개)
target_feature = ['OT']  # 출력 대상 (1개)

# 입력 및 출력 데이터 준비
X_raw = df[input_features].values  # 입력 데이터
y_raw = df[target_feature].values  # 출력 데이터

# 데이터 정규화 (MinMaxScaler)
scaler_X = MinMaxScaler()
scaler_y = MinMaxScaler()
X_scaled = scaler_X.fit_transform(X_raw)  # 입력 데이터 정규화
y_scaled = scaler_y.fit_transform(y_raw)  # 출력 데이터 정규화

# 입력 및 출력 시퀀스 길이 설정
input_length = 96  # 입력 시퀀스 길이
output_length = 720  # 예측 시퀀스 길이 96, 192, 336, 720

# 입력 및 출력 시퀀스 생성
X, y = [], []
for i in range(len(X_scaled) - input_length - output_length + 1):
    X.append(X_scaled[i : i + input_length])  # 입력 시퀀스
    y.append(y_scaled[i + input_length : i + input_length + output_length])  # 출력 시퀀스
X, y = np.array(X), np.array(y)

# Train/Test Split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 결과 확인
print(f"X_train shape: {X_train.shape}")  # (batch_size, input_length, input_features)
print(f"y_train shape: {y_train.shape}")  # (batch_size, output_length, target_features)

X_train shape: (13284, 96, 6)
y_train shape: (13284, 720, 1)


In [3]:
class PositionalEncodingLayer(Layer):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncodingLayer, self).__init__()
        self.d_model = d_model
        self.max_len = max_len

    def call(self, inputs):
        seq_len = tf.shape(inputs)[1]
        position = tf.cast(tf.range(0, seq_len)[:, tf.newaxis], tf.float32)  # (seq_len, 1)
        div_term = tf.exp(tf.range(0, self.d_model, 2, dtype=tf.float32) * -(tf.math.log(10000.0) / self.d_model))  # (d_model/2)

        # Calculate sinusoidal positional encoding
        angle_rads = position * div_term  # Broadcasting (seq_len, d_model/2)
        sin_encoding = tf.sin(angle_rads)  # (seq_len, d_model/2)
        cos_encoding = tf.cos(angle_rads)  # (seq_len, d_model/2)

        # Combine sin and cos encodings
        pe = tf.concat([sin_encoding, cos_encoding], axis=-1)  # (seq_len, d_model)
        pe = tf.expand_dims(pe, axis=0)  # Add batch dimension (1, seq_len, d_model)

        return inputs + pe[:, :seq_len, :]


In [21]:
class CNNPositionalEncodingLayer(Layer):
    def __init__(self, d_model, num_cnn_layers=3, kernel_size=3, activation="relu"):
        super(CNNPositionalEncodingLayer, self).__init__()
        self.d_model = d_model
        self.num_cnn_layers = num_cnn_layers
        self.kernel_size = kernel_size
        self.activation = activation

        # CNN 레이어 쌓기
        self.conv_layers = [
            tf.keras.layers.Conv1D(
                filters=d_model,
                kernel_size=self.kernel_size,
                padding="same",
                activation=self.activation
            )
            for _ in range(self.num_cnn_layers)
        ]

        # 마지막 1D CNN 레이어로 d_model 출력 보장
        self.final_conv = tf.keras.layers.Conv1D(
            filters=d_model,
            kernel_size=1,
            padding="same",
            activation=None
        )

    def call(self, inputs):
        x = inputs
        for conv in self.conv_layers:
            x = conv(x)

        # 마지막 CNN 레이어로 d_model 크기의 출력 생성
        pos_encoding = self.final_conv(x)

        # 포지셔널 인코딩 추가
        return inputs + pos_encoding

# Vanila Transformer(encoder-only)

In [22]:
def build_transformer(input_length, input_dim, output_length, d_model, num_heads, num_layers, pos = 0):
    # 입력 레이어
    inputs = Input(shape=(input_length, input_dim))  # (batch_size, input_length, input_dim)
    x = Dense(d_model)(inputs)  # Feature 차원을 input_dim → d_model로 변환
    
    # 포지셔널 인코딩 방식 선택
    if pos == 0:
        x = PositionalEncodingLayer(d_model)(x)  # 기존 Positional Encoding 추가
    elif pos == 1:
        x = CNNPositionalEncodingLayer(d_model)(x)  # CNN 기반 Positional Encoding 추가

    # Transformer 블록
    for _ in range(num_layers):
        # Multi-Head Attention
        attention_output = tf.keras.layers.MultiHeadAttention(num_heads=num_heads, key_dim=d_model)(x, x)
        attention_output = tf.keras.layers.Dropout(0.1)(attention_output)
        attention_output = Add()([x, attention_output])  # Residual Connection
        attention_output = tf.keras.layers.LayerNormalization(epsilon=1e-6)(attention_output)

        # Feed Forward Network (FFN)
        ffn_output = Dense(d_model * 4, activation="relu")(attention_output)
        ffn_output = Dense(d_model)(ffn_output)
        ffn_output = tf.keras.layers.Dropout(0.1)(ffn_output)
        x = Add()([attention_output, ffn_output])  # Residual Connection
        x = tf.keras.layers.LayerNormalization(epsilon=1e-6)(x)

    # 출력 레이어
    # 입력 시퀀스 길이를 출력 시퀀스 길이로 확장
    x = tf.keras.layers.GlobalAveragePooling1D()(x)  # 시간축 제거 (batch_size, d_model)
    x = tf.keras.layers.RepeatVector(output_length)(x)  # (batch_size, output_length, d_model)
    x = Dense(d_model, activation="relu")(x)  # 중간 레이어
    outputs = Dense(1, activation="linear")(x)  # 최종 출력 (batch_size, output_length, 1)

    return Model(inputs, outputs)


In [23]:
set_random_seed()

# 모델 생성
input_dim = X_train.shape[-1]   # 입력 데이터의 특성 차원

d_model = 64
num_heads = 4   # Multi-Head Attention에서의 head 개수
num_layers = 2  # Transformer 블록의 레이어 수
pos = 1

# Transformer 모델 생성
transformer_model = build_transformer(input_length, input_dim, output_length, d_model, num_heads, num_layers, pos)

# 모델 컴파일
transformer_model.compile(optimizer="adam", loss="mse", metrics=["mae"])
transformer_model.summary()


Model: "model_5"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_6 (InputLayer)            [(None, 96, 6)]      0                                            
__________________________________________________________________________________________________
dense_32 (Dense)                (None, 96, 64)       448         input_6[0][0]                    
__________________________________________________________________________________________________
cnn_positional_encoding_layer_2 (None, 96, 64)       41216       dense_32[0][0]                   
__________________________________________________________________________________________________
multi_head_attention_12 (MultiH (None, 96, 64)       66368       cnn_positional_encoding_layer_2[0
                                                                 cnn_positional_encoding_lay

In [24]:
# 모델 학습
history = transformer_model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=20, batch_size=256)

# 모델 평가
loss, mae = transformer_model.evaluate(X_test, y_test)
print(f"Test Loss: {loss}, Test MAE: {mae}")

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Test Loss: 0.004863376263529062, Test MAE: 0.05393539369106293


# Vanila Transformer(encoder-decoder)

In [8]:
from tensorflow.keras.layers import Input, Dense, Add, LayerNormalization, MultiHeadAttention, Dropout, RepeatVector, TimeDistributed
from tensorflow.keras.models import Model

def build_transformer(input_length, input_dim, output_length, d_model, num_heads, num_layers, pos=0):
    # 인코더 입력
    encoder_inputs = Input(shape=(input_length, input_dim))  # (batch_size, input_length, input_dim)
    x = Dense(d_model)(encoder_inputs)  # Feature 차원을 input_dim → d_model로 변환
    
    # 포지셔널 인코딩 추가
    if pos == 0:
        x = PositionalEncodingLayer(d_model)(x)
    elif pos == 1:
        x = CNNPositionalEncodingLayer(d_model)(x)

    # 인코더 블록
    for _ in range(num_layers):
        # Self-Attention
        attention_output = MultiHeadAttention(num_heads=num_heads, key_dim=d_model)(x, x)
        attention_output = Dropout(0.1)(attention_output)
        attention_output = Add()([x, attention_output])  # Residual Connection
        attention_output = LayerNormalization(epsilon=1e-6)(attention_output)

        # Feed Forward Network
        ffn_output = Dense(d_model * 4, activation="relu")(attention_output)
        ffn_output = Dense(d_model)(ffn_output)
        ffn_output = Dropout(0.1)(ffn_output)
        x = Add()([attention_output, ffn_output])  # Residual Connection
        x = LayerNormalization(epsilon=1e-6)(x)

    encoder_outputs = x  # 인코더 출력: (batch_size, input_length, d_model)

    # 디코더 입력
    decoder_inputs = RepeatVector(output_length)(encoder_outputs[:, -1, :])  # 마지막 타임스텝의 상태 복제 (batch_size, output_length, d_model)
    y = decoder_inputs

    # 디코더 블록
    for _ in range(num_layers):
        # Self-Attention in Decoder
        self_attention = MultiHeadAttention(num_heads=num_heads, key_dim=d_model)(y, y)
        self_attention = Dropout(0.1)(self_attention)
        self_attention = Add()([y, self_attention])  # Residual Connection
        self_attention = LayerNormalization(epsilon=1e-6)(self_attention)

        # Cross-Attention with Encoder Outputs
        cross_attention = MultiHeadAttention(num_heads=num_heads, key_dim=d_model)(self_attention, encoder_outputs)
        cross_attention = Dropout(0.1)(cross_attention)
        cross_attention = Add()([self_attention, cross_attention])  # Residual Connection
        cross_attention = LayerNormalization(epsilon=1e-6)(cross_attention)

        # Feed Forward Network
        ffn_output = Dense(d_model * 4, activation="relu")(cross_attention)
        ffn_output = Dense(d_model)(ffn_output)
        ffn_output = Dropout(0.1)(ffn_output)
        y = Add()([cross_attention, ffn_output])  # Residual Connection
        y = LayerNormalization(epsilon=1e-6)(y)

    # 디코더 출력
    outputs = TimeDistributed(Dense(1, activation="linear"))(y)  # (batch_size, output_length, 1)

    # 모델 생성
    return Model(inputs=encoder_inputs, outputs=outputs)


In [9]:
set_random_seed()

# 모델 생성
input_dim = X_train.shape[-1]   # 입력 데이터의 특성 차원

d_model = 64
num_heads = 4   # Multi-Head Attention에서의 head 개수
num_layers = 2  # Transformer 블록의 레이어 수
pos = 0

# Transformer 모델 생성
transformer_model = build_transformer(input_length, input_dim, output_length, d_model, num_heads, num_layers, pos)

# 모델 컴파일
transformer_model.compile(optimizer="adam", loss="mse", metrics=["mae"])
transformer_model.summary()


Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_2 (InputLayer)            [(None, 96, 6)]      0                                            
__________________________________________________________________________________________________
dense_7 (Dense)                 (None, 96, 64)       448         input_2[0][0]                    
__________________________________________________________________________________________________
positional_encoding_layer_1 (Po (None, 96, 64)       0           dense_7[0][0]                    
__________________________________________________________________________________________________
multi_head_attention_2 (MultiHe (None, 96, 64)       66368       positional_encoding_layer_1[0][0]
                                                                 positional_encoding_layer_1

In [10]:
# 학습
history = transformer_model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=20,
    batch_size=256
)

# 평가
loss, mae = transformer_model.evaluate(X_test, y_test)
print(f"Test Loss: {loss}, Test MAE: {mae}")

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20
Test Loss: 0.012500489130616188, Test MAE: 0.08852755278348923


# LSTM 

In [11]:
from tensorflow.keras.layers import Input, LSTM, Dense, RepeatVector, TimeDistributed
from tensorflow.keras.models import Model

def build_lstm(input_length, input_dim, output_length, lstm_units):
    """
    LSTM 기반 시계열 예측 모델
    Args:
        input_length: 입력 시퀀스 길이
        input_dim: 입력 데이터 차원 (특성 수)
        output_length: 출력 시퀀스 길이
        lstm_units: LSTM 유닛 수
    Returns:
        Model: 생성된 Keras 모델
    """
    # 입력 레이어
    inputs = Input(shape=(input_length, input_dim))  # (batch_size, input_length, input_dim)
    
    # LSTM 인코더
    x = LSTM(lstm_units, return_sequences=False)(inputs)  # (batch_size, lstm_units)
    
    # RepeatVector로 디코더 입력 생성
    x = RepeatVector(output_length)(x)  # (batch_size, output_length, lstm_units)
    
    # LSTM 디코더
    x = LSTM(lstm_units, return_sequences=True)(x)  # (batch_size, output_length, lstm_units)
    
    # TimeDistributed를 사용한 Dense 출력
    outputs = TimeDistributed(Dense(1))(x)  # (batch_size, output_length, 1)
    
    return Model(inputs, outputs)


In [12]:
set_random_seed()

# 모델 생성
lstm_units = 64  # LSTM 유닛 수
lstm_model = build_lstm(input_length, input_dim, output_length, lstm_units)

# 모델 컴파일
lstm_model.compile(optimizer="adam", loss="mse", metrics=["mae"])
lstm_model.summary()

# 모델 학습
history = lstm_model.fit(
    X_train, y_train,
    validation_data=(X_test, y_test),
    epochs=20,
    batch_size=256
)

# 모델 평가
loss, mae = lstm_model.evaluate(X_test, y_test)
print(f"Test Loss: {loss}, Test MAE: {mae}")


Model: "model_2"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_3 (InputLayer)         [(None, 96, 6)]           0         
_________________________________________________________________
lstm (LSTM)                  (None, 64)                18176     
_________________________________________________________________
repeat_vector_2 (RepeatVecto (None, 720, 64)           0         
_________________________________________________________________
lstm_1 (LSTM)                (None, 720, 64)           33024     
_________________________________________________________________
time_distributed_1 (TimeDist (None, 720, 1)            65        
Total params: 51,265
Trainable params: 51,265
Non-trainable params: 0
_________________________________________________________________
Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/2