Import Library

In [None]:
import numpy as np
import pandas as pd
import datetime
import os
import copy

import matplotlib.pyplot as plt

# Scaling
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler

# CustomDataSet
from torch.utils.data import DataLoader, Dataset

# Transformer
import math
import torch
from torch import nn, Tensor
import torch.optim as optim
from torch.nn import TransformerEncoder, TransformerEncoderLayer

from tqdm import tqdm

### Load Data

In [None]:
data = pd.read_csv( '/data_selected.csv', index_col = 0 )
data.index = pd.to_datetime( data.index )

data_dae = pd.read_csv( '/DAE_result.csv', index_col = 0 )
data_dae.index = pd.to_datetime( data.index )

data_DAE = copy.deepcopy( data )
data_DAE['현재수요+태양광'] = data_dae['DAE']
data_DAE

### Transformer

Setting

In [None]:
"""
ip : 입력 데이터 기간
op : 출력 데이터 기간
stride : stride
batch : 배치 사이즈
num_indep : 독립 변수 갯수
mode : scalar 종류, min-max & standard
n_head : Attention head 갯수
d_model : 임베딩 차원
n_layer : 레이어 갯수
lr : 학습률
epoch : 학습 횟수
patience_limit : early stop 조정 파라미터
dropout_TF : 인코더 레이어 dropout 비율
dropout_PE : Positional Encoding dropout 비율
train_start : 학습 데이터 시작 일시
val_start : 검증 데이터 시작 일시
test_start : 테스트 데이터 시작 일시
test_last : 테스트 데이터 마지막 일시
"""


Setting = {
    'ip' : 720, 'op' : 72, 'stride' : 72,
    'batch' : 32,
    'num_indep' : len( data.columns ) - 3,
    'mode' : 'standard', # min-max / standard
    'n_head' : len( data.columns ) - 3,
    'd_model' : ( len( data.columns ) - 3 )*31,
    'n_layer' : 2,
    'lr' : 1e-4,
    'epoch' : 150,
    'patience_limit' : 7,
    'dropout_TF' : 0.1,
    'dropout_PE' : 0.1,
    'train_start' : '2022-03-01 00:00:00',
    'val_start' : '2023-02-01 00:00:00',
    'test_start' : '2023-03-13 00:00:00',
    'test_last' : '2023-03-19 23:55:00'
    }

### Functions

Transform to float32

In [None]:
# 데이터 타입을 float32로 바꿔주는 함수

def Transform_to_float32( data, num ) :

    # data : 데이터
    # num : 바꿀 변수 갯수

    for i in range( num ) :
        col = data.columns[ i ]
        data[ col ] = data[ col ].astype( np.float32 )

    return data

Seed 고정

In [None]:
# Seed 고정 함수

def seed_everything(seed: int = 42):

    np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

Split_train_test_set

In [None]:
# Train set, Validation set, Test set 분할 함수

def split_train_val_test(input_df, train_start, val_start, test_start, test_last, ip) :

    # input_df : 입력 데이터
    # train_start : 학습 데이터 시작 일시
    # val_start : 검증 데이터 시작 일시
    # test_start : 테스트 데이터 시작 일시
    # test_last : 테스트 데이터 마지막 일시
    # ip : 가중치 업데이트에 사용되는 입력 데이터 길이

    import copy
    from datetime import datetime, timedelta

    copy_df = copy.deepcopy(input_df)

    date_format = '%Y-%m-%d %H:%M:%S'

    train_date = datetime.strptime(train_start, date_format)
    val_date = datetime.strptime(val_start, date_format)
    test_date = datetime.strptime(test_start, date_format)

    train = copy_df[train_date : val_date - timedelta(minutes = 5 * ip) - timedelta(minutes = 5)]
    val = copy_df[val_date - timedelta(minutes = 5 * ip) : test_date - timedelta(minutes = 5 * ip) - timedelta(minutes = 5) ]
    test = copy_df[test_date  - timedelta(minutes = 5 * ip) : test_last]

    return train, val, test

Scaling

In [None]:
# Scaling 함수 : MinMax / Standard

def data_scaling(X_train, X_val, X_test, method) :
    global min_max, std

    # X_train : 학습 데이터 셋
    # X_val : 검증 데이터 셋
    # X_test : 테스트 데이터 셋
    # method : min-max, standard

    drop_list = ['공휴일', '주말']
    drop_idx = 7

    temp_X_train = X_train[drop_list]
    temp_X_val = X_val[drop_list]
    temp_X_test = X_test[drop_list]

    X_train = X_train.drop(columns = drop_list, axis = 1)
    X_val = X_val.drop(columns = drop_list, axis = 1)
    X_test = X_test.drop(columns = drop_list, axis = 1)

    if method == 'standard':

        from sklearn.preprocessing import StandardScaler

        std = StandardScaler()
        std.fit(X_train)

        X_train_scaled = std.transform(X_train)
        X_val_scaled = std.transform(X_val)
        X_test_scaled = std.transform(X_test)

        X_train_scaled_df = pd.DataFrame(index = X_train.index, data = X_train_scaled, columns = X_train.columns)
        X_train_scaled_df = pd.concat([X_train_scaled_df.iloc[:, : drop_idx ], temp_X_train, X_train_scaled_df.iloc[:, drop_idx : ]], axis = 1)

        X_val_scaled_df = pd.DataFrame(index = X_val.index, data = X_val_scaled, columns = X_train.columns)
        X_val_scaled_df = pd.concat([X_val_scaled_df.iloc[:, : drop_idx ], temp_X_val, X_val_scaled_df.iloc[:, drop_idx : ]], axis = 1)

        X_test_scaled_df = pd.DataFrame(index = X_test.index, data = X_test_scaled, columns = X_train.columns)
        X_test_scaled_df = pd.concat([X_test_scaled_df.iloc[:, : drop_idx ], temp_X_test, X_test_scaled_df.iloc[:, drop_idx : ]], axis = 1)

        return X_train_scaled_df, X_val_scaled_df, X_test_scaled_df

    elif method == 'min-max':

        from sklearn.preprocessing import MinMaxScaler

        min_max = MinMaxScaler()
        min_max.fit(X_train)

        X_train_scaled = min_max.transform(X_train)
        X_val_scaled = min_max.transform(X_val)
        X_test_scaled = min_max.transform(X_test)

        X_train_scaled_df = pd.DataFrame(index = X_train.index, data = X_train_scaled, columns = X_train.columns)
        X_train_scaled_df = pd.concat([X_train_scaled_df.iloc[:, : drop_idx ], temp_X_train, X_train_scaled_df.iloc[:, drop_idx : ]], axis = 1)

        X_val_scaled_df = pd.DataFrame(index = X_val.index, data = X_val_scaled, columns = X_train.columns)
        X_val_scaled_df = pd.concat([X_val_scaled_df.iloc[:, : drop_idx ], temp_X_val, X_val_scaled_df.iloc[:, drop_idx : ]], axis = 1)

        X_test_scaled_df = pd.DataFrame(index = X_test.index, data = X_test_scaled, columns = X_train.columns)
        X_test_scaled_df = pd.concat([X_test_scaled_df.iloc[:, : drop_idx ], temp_X_test, X_test_scaled_df.iloc[:, drop_idx : ]], axis = 1)

        return X_train_scaled_df, X_val_scaled_df, X_test_scaled_df

Make CustomDataSet for PyTorch

In [None]:
# DataSet 만드는 함수

class CustomDataSet(Dataset):
    def __init__(self, DataSet, ip, op, stride):

        # DataSet : 입력 데이터
        # ip : input size
        # op : output size
        # stride : 데이터 간격

        self.DataSet = DataSet
        self.ip = ip
        self.op = op
        self.stride = stride
        self.data_list = []
        self.label_list = []
        self.indep_list = []

        print('Data Pre-processing..')

        L = len(DataSet)
        num_samples = (L - ip - op) // stride + 1

        self.input_df = DataSet['현재수요+태양광']
        self.indep_variable = DataSet.drop( columns = [ '태양광 발전량(MWh)', '현재수요(MW)', '현재수요+태양광' ],axis = 1 )
        self.target_df = DataSet['현재수요+태양광']

        for idx in tqdm(range(num_samples)):

            Start_x = stride * idx
            End_x = Start_x + ip
            temp_time_data = self.input_df[Start_x : End_x]
            temp_inpet_variable = self.indep_variable[Start_x : End_x]

            self.data_list.append(torch.Tensor(temp_time_data.values))
            self.indep_list.append( torch.Tensor( temp_inpet_variable.values ) )

            Start_y = stride * idx + ip
            End_y = Start_y + op
            temp_label_data = self.target_df[Start_y : End_y]
            self.label_list.append(torch.Tensor(temp_label_data.values))

        print("DONE!!\n")

    def __getitem__(self, index):
        data = self.data_list[index]
        label = self.label_list[index]
        indep = self.indep_list[index]

        return data, label, indep

    def __len__(self):
        return len(self.data_list)

Transformer

In [None]:
# Transformer Model

class TFModel( nn.Module ) :

    def __init__( self, iw, ow, d_model, nhead, nlayers, dropout = 0.5 ) :

        # iw : input size                 | 720
        # ow : output size                | 72
        # d_model : 임베딩 차원           | 279( 9*31 )
        # nhead : Attention head 갯수     | 9
        # nlayers : 레이어 갯수           | 2
        # dropout : dropout 비율          | 0.1

        d_model = d_model + Setting['num_indep']
        new_d_model = d_model - Setting['num_indep']

        super( TFModel, self ).__init__()
        self.encoder_layer = nn.TransformerEncoderLayer( d_model = d_model, nhead = nhead, dropout = dropout )
        self.transformer_encoder = nn.TransformerEncoder( self.encoder_layer, num_layers = nlayers )
        self.pos_encoder = PositionalEncoding( d_model, dropout )

        # 입력데이터를 임베딩하여 d_model 차원으로 확장
        self.encoder = nn.Sequential(
            nn.Linear( 1, new_d_model//2 ),
            nn.ReLU(),
            nn.Linear( new_d_model//2, new_d_model )
        )

        # transformer의 출력을 다시 시계열 데이터의 예측값으로 변환
        self.linear = nn.Sequential(
            nn.Linear( d_model, d_model//2 ),
            nn.ReLU(),
            nn.Linear( d_model//2, 1 )
        )

        # 입력 차원인 iw에서 출력 차원인 ow로 변환
        self.linear2 = nn.Sequential(
            nn.Linear( iw, ( iw + ow )//2 ),
            nn.ReLU(),
            nn.Linear( ( iw + ow )//2, ow )
        )

    # 마스크 생성 함수
    def generate_square_subsequent_mask( self, sz ) :

        mask = ( torch.triu( torch.ones( sz,sz ) ) == 1 ).transpose(0,1)
        mask = mask.float().masked_fill( mask == 0, float( '-inf' ) ).masked_fill( mask == 1, float(0.0) )

        return mask

    # src : 입력 시계열 데이터, srcmask : 입력시계열 데이터에 대한 마스크
    def forward( self, src, srcmask, indeps ) :

        src = self.encoder( src )
        src = torch.cat( (src, indeps), dim = 2 )
        src = self.pos_encoder( src )
        output = self.transformer_encoder( src.transpose(0,1), srcmask ).transpose( 0,1 )
        output = self.linear( output )[:,:,0]
        output = self.linear2( output )

        return output

In [None]:
# 위치정보 인코딩 클래스

class PositionalEncoding( nn.Module ) :

    def __init__( self, d_model, dropout = 0.1, max_len = 5000 ) :

        # d_model : 임베딩 차원
        # dropout : dropout 비율
        # max_len : Positional Matrix 초기 사이즈

        super( PositionalEncoding, self ).__init__()
        self.dropout = nn.Dropout( p = dropout )

        d_model_new = d_model

        pe = torch.zeros( max_len, d_model_new )

        position = torch.arange( 0, max_len, dtype = torch.float ).unsqueeze(1)
        div_term = torch.exp( torch.arange(0, d_model_new, 2).float() * (-math.log( 10000.0 ) / d_model_new ) )

        pe[:, 0::2] = torch.sin( position * div_term )
        pe[:, 1::2] = torch.cos( position * div_term )
        pe = pe.unsqueeze(0).transpose(0,1)
        self.register_buffer( 'pe', pe )

    def forward( self, x ) :

        x = x + self.pe[:x.size(0), :]

        return self.dropout(x)


In [None]:
# mask 행렬 반환 함수

def gen_attention_mask(x) :
    mask = torch.eq(x, 0)

    return mask

Train & Validation & Test

In [None]:
# 학습 및 검증 함수

def Train_Validation( Settings, train, validation, test ) :

    # Settings : 하이퍼파라미터 설정값
    # train : 학습 데이터
    # validation : 검증 데이터
    # test : 테스트 데이터

    device = torch.device( 'cuda' )

    Setting = Settings
    train_loader = train
    val_loader = validation
    test_loader = test

    best_loss = 10**9
    patience_limit = Setting['patience_limit']
    patience_check = 0

    lr = Setting['lr']
    model = TFModel( iw = Setting['ip'], ow = Setting['op'], d_model = Setting['d_model'], nhead = Setting['n_head'], nlayers = Setting['n_layer'], dropout = Setting['dropout_TF'] ).to(device) # nhead : 멀티헤드어텐션의 개수
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam( model.parameters(), lr=lr )

    # Sceduler
    scheduler = optim.lr_scheduler.LambdaLR(optimizer=optimizer,
                                        lr_lambda=lambda epoch: 0.95 ** epoch,
                                        last_epoch=-1,
                                        verbose=False)


    train_loss_per_epoch = []
    val_loss_per_epoch = []
    test_loss_per_epoch = []

    epoch = Setting['epoch']
    progress = tqdm( range(epoch) )
    for i in progress :

        ### Train
        batchloss = 0.0
        model.train()
        for ( inputs, outputs, indeps ) in train_loader :
            optimizer.zero_grad()
            src_mask = model.generate_square_subsequent_mask( inputs.shape[1] ).to(device)
            result = model( inputs.unsqueeze(-1).float().to(device), src_mask, indeps.float().to(device) )
            loss = criterion( result, outputs.float().to(device) )
            loss.backward()
            batchloss += loss
            optimizer.step()

        scheduler.step()

        train_loss_per_epoch.append( batchloss.cpu().item() / len( train_loader ) )

        ### Validation
        val_loss = 0.0
        model.eval()
        for ( inputs, outputs, indeps ) in val_loader :

            src_mask = model.generate_square_subsequent_mask( inputs.shape[1] ).to(device) # 굳이 안해줘도 됐을 듯
            Y_pred = model( inputs.unsqueeze(-1).float().to(device), src_mask, indeps.float().to(device) )
            loss = criterion( Y_pred, outputs.float().to(device) )
            val_loss += loss
        val_loss_per_epoch.append( val_loss.cpu().item() / len( val_loader ) )

        ### Test
        test_loss = 0.0
        model.eval()
        for ( inputs, outputs, indeps ) in test_loader :

            src_mask = model.generate_square_subsequent_mask( inputs.shape[1] ).to(device) # 굳이 안해줘도 됐을 듯
            result = model( inputs.unsqueeze(-1).float().to(device), src_mask, indeps.float().to(device) )
            loss = criterion( result, outputs.float().to(device) )
            test_loss += loss
        test_loss_per_epoch.append( test_loss.cpu().item() / len( test_loader ) )

        ### Early Stopping
        if ( val_loss > best_loss ) :
            patience_check += 1

            if ( patience_check >= patience_limit ) :
                print( '\nEarly Stopped!' )

                break

        else :
            best_loss = val_loss
            best_model = model
            patience_check = 0

    return train_loss_per_epoch, val_loss_per_epoch, test_loss_per_epoch, best_model

Test & Flatten Results

In [None]:
# 테스트 결과 수집 함수

def Get_test_result(model, test_loader, device):

    # model : 학습된 모델
    # test_loader : 테스트 데이터 로더
    # device : device

    pred_result = []

    model = model

    for X, Y, Z in tqdm(iter(test_loader)):

        src_mask = model.generate_square_subsequent_mask( X.shape[1] ).to(device)
        model_pred = model( X.unsqueeze(-1).float().to(device), src_mask, Z.float().to(device) )


        for i in model_pred.flatten():

            pred_result.append(i.item())

    return pred_result

Inverse Transformation of Results

In [None]:
# Scale 역변환 함수 : 정규화된 데이터 값을 원래 단위 값으로 변환

def Inverse_scalar_transformation(input_df, scalar, scalar_type) :

    # input_df : 입력 데이터
    # scalar_type : 스케일러 종류 (min-max, standard)

    real_pred_result = []

    if scalar_type == 'min-max':

        min = scalar.data_min_[-1]
        max = scalar.data_max_[-1]

        for i in input_df:

            temp_value = min + i * (max-min)

            real_pred_result.append(temp_value)

        real_pred_result = pd.Series(real_pred_result)


    if scalar_type == 'standard':

        mean = scalar.mean_[-1]
        std = np.sqrt(scalar.var_[-1])

        for i in input_df:

            temp_value = mean + i * (std)
            real_pred_result.append(temp_value)

        real_pred_result = pd.Series(real_pred_result)

    return real_pred_result

MAPE

In [None]:
# MAPE 계산 함수

def MAPE(y_test, y_pred):

    # y_test : 실제 목표변수 값
    # y_pred : 예측된 목표변수 값

	return np.mean(np.abs((y_test - y_pred) / y_test)) * 100

### MAIN

In [None]:
# Setting에 따른 Transformer 모델 실행

def Transformer_Scenario( Settings, datas ) :

    # Settings : 하이퍼파라미터 설정값
    # datas : 데이터

    device = torch.device( 'cuda' )

    seed_everything(42)

    Setting = Settings
    data = datas

    ### Transform to float32
    data = Transform_to_float32( data, Setting['num_indep'] )

    ### Split_train_val_test
    data_train, data_val, data_test =  split_train_val_test( data, Setting['train_start'], Setting['val_start'], Setting['test_start'], Setting['test_last'], ip = Setting['ip'] )

    ### Scaling
    data_train_scaled, data_val_scaled, data_test_scaled = data_scaling( data_train, data_val, data_test, Setting['mode'] )

    ### Make CustomDataSet
    train_dataset = CustomDataSet(data_train_scaled, ip = Setting['ip'], op = Setting['op'], stride = Setting['stride'])
    train_loader = DataLoader(train_dataset, batch_size = Setting['batch'], shuffle=False)

    val_dataset = CustomDataSet(data_val_scaled, ip = Setting['ip'], op = Setting['op'], stride = Setting['stride'])
    val_loader = DataLoader(val_dataset, batch_size = Setting['batch'], shuffle = False)

    test_dataset = CustomDataSet(data_test_scaled, ip = Setting['ip'], op = Setting['op'], stride = Setting['stride'])
    test_loader = DataLoader(test_dataset, batch_size = Setting['batch'], shuffle = False)

    ### Train & Validation & Test
    train_loss_per_epoch, val_loss_per_epoch, test_loss_per_epoch, best_model = Train_Validation( Setting, train_loader, val_loader, test_loader )

    ### Test & Flatten Results
    pred_result = Get_test_result( best_model, test_loader, device )

    ### Inverse Transformation of Results
    if Setting['mode'] == 'min-max' :
        Scaler = min_max
    else :
        Scaler = std
    real_pred_result = Inverse_scalar_transformation( pred_result, Scaler, Setting['mode'] )

    ### MAPE
    Y_test = data_test['현재수요(MW)']['2023-03-13 00:00:00' : '2023-03-19 23:55:00']
    Y_test_reset_index = Y_test.reset_index(drop = True)
    Y_test_reset_index = Y_test_reset_index.to_numpy()

    Sunpower_test= data_test['태양광 발전량(MWh)']['2023-03-13 00:00:00' : '2023-03-19 23:55:00']
    Sunpower_test_reset_index = Sunpower_test.reset_index(drop = True)

    real_test_result = real_pred_result - Sunpower_test_reset_index
    real_test_result = real_test_result.to_numpy()

    mape = MAPE( Y_test, real_test_result )

    ### 결과 저장
    result_dic = {
        'best_model' : best_model,
        'train_loss_per_epoch' : train_loss_per_epoch,
        'val_loss_per_epoch' : val_loss_per_epoch,
        'test_loss_per_epoch' : test_loss_per_epoch,
        'Y_test' : Y_test,
        'real_test_result' : real_test_result,
        'mape' : mape
        }

    return result_dic



###실행

In [None]:
result_dic = Transformer_Scenario( Setting, data_DAE )

In [None]:
result_dic['mape']

### 결과 저장

In [None]:
submit = pd.DataFrame( data = result_dic['real_test_result'], columns = ['예측된 현재수요(MW)'], index = result_dic['Y_test'].index )
submit.to_csv('저장경로/submit.csv')