In [1]:
import torch
from torch import nn, Tensor
import math
import torch.nn.functional as F
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from pandas import to_datetime
from sklearn.model_selection import train_test_split
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.metrics import mean_absolute_error, r2_score
import numpy as np
import gc

In [2]:
de = pd.read_csv('DETotal.csv')
kepco = pd.read_csv('kepcoTotal.csv')

In [3]:
# 1. 'Hourly Time'을 datetime 객체로 변환
de['Hourly Time'] = to_datetime(de['Hourly Time'], format='%d.%m.%Y %H:%M')

# 2. 추가적인 시간 관련 특징을 추출 (예: 요일, 시간 등)
de['weekday'] = de['Hourly Time'].dt.dayofweek  # 요일을 나타내는 0 (월요일)에서 6 (일요일)까지의 숫자
de['hour'] = de['Hourly Time'].dt.hour          # 시간 (0-23)

# 3. 'Hourly Sum' 전력 사용량 데이터 정규화
scaler = StandardScaler()
de['Hourly Sum'] = scaler.fit_transform(de[['Hourly Sum']])

# 데이터 프레임에서 필요한 열만 선택
processed_de = de[['Hourly Sum', 'weekday', 'hour']]

# 출력을 위한 정제된 데이터의 처음 몇 행을 보여줌
processed_de.head()

Unnamed: 0,Hourly Sum,weekday,hour
0,0.250072,3,0
1,0.220342,3,1
2,0.193523,3,2
3,0.179949,3,3
4,0.166708,3,4


In [4]:
train_de = processed_de[:int(0.8*len(processed_de))]
test_de = processed_de[int(0.8*len(processed_de)):]
train_de.shape, test_de.shape

((61364, 3), (15341, 3))

In [5]:
# 설정된 시퀀스 길이와 예측 길이
sequence_length = 168  # 168 hours = 7 days
prediction_length = 24  # 24 hours = 1 day

# 시퀀스 데이터를 생성하는 함수
def create_sequences(de, sequence_length, prediction_length):
    X = []
    y = []

    for i in range(len(de) - sequence_length - prediction_length + 1):
        X.append(de[i:(i + sequence_length)].values)
        y.append(de[(i + sequence_length):(i + sequence_length + prediction_length)]['Hourly Sum'].values)

    # numpy 배열로 변환
    X = np.array(X)
    y = np.array(y)

    return X, y

# 입력 특징과 타겟 변수를 기반으로 시퀀스 데이터 생성
X_test, y_test = create_sequences(test_de, sequence_length, prediction_length)
# 결과 확인
print(X_test.shape, y_test.shape)

(15150, 168, 3) (15150, 24)


In [6]:
#device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [7]:
class TransformerModel(nn.Module):
    def __init__(self, input_dim, model_dim, num_heads, num_layers, output_dim, dropout=0.1):
        super(TransformerModel, self).__init__()
        self.model_dim = model_dim
        self.input_linear = nn.Linear(input_dim, model_dim)
        self.positional_encoding = nn.Parameter(torch.randn(1, sequence_length, model_dim))
        self.encoder_layer = nn.TransformerEncoderLayer(
            d_model=model_dim, nhead=num_heads, dim_feedforward=model_dim * 4, dropout=dropout,batch_first = True
        )
        self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)
        self.output_linear = nn.Linear(model_dim, output_dim)

    def generate_square_subsequent_mask(self, sz):
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask

    def forward(self, x):
        x = self.input_linear(x) + self.positional_encoding
        mask = self.generate_square_subsequent_mask(x.size(1)).to(x.device)
        x = self.transformer_encoder(x, mask)
        return self.output_linear(x[:, -1, :])

# 모델 인스턴스 생성
input_dim = X_test.shape[-1]  # 입력 차원
model_dim = 512  # 모델의 특징 차원
num_heads = 8  # 어텐션 헤드의 수
num_layers = 3  # 인코더 레이어의 수
output_dim = y_test.shape[-1]  # 출력 차원

model = TransformerModel(input_dim, model_dim, num_heads, num_layers, output_dim)

# 손실 함수와 옵티마이저
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 모델 요약
model, criterion, optimizer
print(output_dim)

24


In [8]:
def split_data(X_test,split_len):
    data_list = []
    test_len = int(len(X_test)/split_len)
    for i in range(test_len):
        data_list.append(X_test[i:i+split_len])
        i+=150
    return data_list, test_len

In [9]:
data_list,test_len = split_data(X_test,150)

In [10]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

TransformerModel(
  (input_linear): Linear(in_features=3, out_features=512, bias=True)
  (encoder_layer): TransformerEncoderLayer(
    (self_attn): MultiheadAttention(
      (out_proj): NonDynamicallyQuantizableLinear(in_features=512, out_features=512, bias=True)
    )
    (linear1): Linear(in_features=512, out_features=2048, bias=True)
    (dropout): Dropout(p=0.1, inplace=False)
    (linear2): Linear(in_features=2048, out_features=512, bias=True)
    (norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
    (norm2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
    (dropout1): Dropout(p=0.1, inplace=False)
    (dropout2): Dropout(p=0.1, inplace=False)
  )
  (transformer_encoder): TransformerEncoder(
    (layers): ModuleList(
      (0-2): 3 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=512, out_features=512, bias=True)
        )
        (linear1): Linear(in_features=512, out_featur

In [11]:
def to_tensor(X_test):
    X_test_tensor = torch.tensor(X_test,dtype=torch.float32).to(device)
    return X_test_tensor
y_test_tensor = torch.tensor(y_test,dtype=torch.float32)

In [12]:
for i in range(test_len):
    data_list[i] = to_tensor(data_list[i])

In [13]:
model.load_state_dict(torch.load('model_weights3.pth'))
predictions_tensor = []

model.eval()
with torch.no_grad():
    for i in range(test_len):
        temp = model(data_list[i])
        predictions_tensor.append(temp)
        gc.collect()
        torch.cuda.empty_cache()

In [14]:
predictions_tensor = [tensor.cpu() for tensor in predictions_tensor]

In [15]:
predictions = np.array(predictions_tensor)
predictions.shape

(101, 150, 24)

In [17]:
predictions = predictions.reshape(-1,predictions.shape[2])

In [18]:
y_test = np.array(y_test_tensor)

In [24]:
mae = mean_absolute_error(y_test, predictions).round(5)

# Calculate MAPE
def mean_absolute_percentage_error(y_test, predictions): 
    y_test, predictions = np.array(y_test), np.array(predictions)
    non_zero_mask = y_test != 0  # To avoid division by zero
    return np.mean(np.abs((y_test[non_zero_mask] - predictions[non_zero_mask]) / y_test[non_zero_mask])) * 100
mape = mean_absolute_percentage_error(y_test, predictions).round(5)

# Calculate R-squared (R²)
r_squared = r2_score(y_test, predictions).round(5)

# Print the metrics
print(f"Mean Absolute Error (MAE): {mae}")
print(f"Mean Absolute Percentage Error (MAPE): {mape}%")
print(f"R-squared (R²): {r_squared}")

Mean Absolute Error (MAE): 0.18437999486923218
Mean Absolute Percentage Error (MAPE): 57.1024%
R-squared (R²): -0.4888
