In [16]:
import pandas as pd
import numpy as np
import psycopg2 
from tqdm import tqdm
from sklearn.linear_model import LinearRegression
from skforecast.ForecasterAutoreg import ForecasterAutoreg
from sklearn.preprocessing import StandardScaler

In [66]:
class Benchmarks:
    
    forecasts = ['naive_forecast', 'seasonal_forecast_daily', 'seasonal_forecast_weekly', 
                 'seasonal_forecast_yearly', 'random_walk_forecast', 'linear_regression']
    
    def __init__(self):
        
        self.exogenous_columns = self.read_file("exog.txt")
        self.MAE = None
        self.MAPE = None
        self.RMSE = None
        self.forecaster = None
        
        
    def fit_and_predict(self, data, test_size = 0.1):
        
        self.reset_errors()
        
        end_of_train = int(len(data)*(1-test_size))
        num_forecasts = int((len(data) - end_of_train)//24 - 1)

        for f in tqdm(range(0, num_forecasts)):

            frontier        = end_of_train + 24*f
            train           = data[:frontier]
            train_linear    = data[frontier-24*180:frontier] # Train on six months
            test            = data[frontier:frontier + 48]

            y_hat = test[['load']].copy()
            y_hat['naive_forecast']           = self.naive_forecast(train['load'])
            y_hat['seasonal_forecast_daily']  = self.seasonal_forecast_daily(train['load'])
            y_hat['seasonal_forecast_weekly'] = self.seasonal_forecast_weekly(train['load'])
            y_hat['seasonal_forecast_yearly'] = self.seasonal_forecast_yearly(train['load'])
            y_hat['random_walk_forecast']     = self.random_walk_forecast(train['load'])
            y_hat['linear_regression']        = self.linear_regression(train_linear['load'], 
                                                                       train_linear[self.exogenous_columns], 
                                                                       test[self.exogenous_columns],
                                                                       not f%14)


            for forecast in self.forecasts:
                abs_err = (abs(y_hat[forecast]-y_hat['load']))
                self.MAE [forecast][0:48] = np.add(self.MAE[forecast][0:48],abs_err/num_forecasts)
                self.RMSE[forecast][0:48] = np.add(self.RMSE[forecast][0:48],abs_err**2/num_forecasts)
                self.MAPE[forecast][0:48] = np.add(self.MAPE[forecast][0:48],abs_err/(y_hat['load']*num_forecasts))
                     
        self.average_errors()
        print(self.get_errors())
        
            
    def average_errors(self):
        
        for forecast in self.forecasts:

            self.MAE[forecast][48]    = np.mean(self.MAE[forecast][0:24])
            self.MAE[forecast][49]    = np.mean(self.MAE[forecast][24:48])
            self.MAE[forecast][50]    = np.mean(self.MAE[forecast][0:48])

            self.MAPE[forecast][48]   = np.mean(self.MAPE[forecast][0:24])
            self.MAPE[forecast][49]   = np.mean(self.MAPE[forecast][24:48])
            self.MAPE[forecast][50]   = np.mean(self.MAPE[forecast][0:48])

            self.RMSE[forecast][0:48] = np.sqrt(self.RMSE[forecast][0:48])
            self.RMSE[forecast][48]   = np.mean(self.RMSE[forecast][0:24])
            self.RMSE[forecast][49]   = np.mean(self.RMSE[forecast][24:48])
            self.RMSE[forecast][50]   = np.mean(self.RMSE[forecast][0:48])
            
    
    def get_errors(self, s = 3):
        
        MAE_df  = pd.DataFrame(self.MAE,  index=['H'+str(i) for i in range(1,49)] + ['First 24 hours'] + ['Second 24 hours'] + ['Total'])
        MAPE_df = pd.DataFrame(self.MAPE, index=['H'+str(i) for i in range(1,49)] + ['First 24 hours'] + ['Second 24 hours'] + ['Total'])
        RMSE_df = pd.DataFrame(self.RMSE, index=['H'+str(i) for i in range(1,49)] + ['First 24 hours'] + ['Second 24 hours'] + ['Total'])
        
        frames = [MAE_df .iloc[s+47:s+48], 
                  MAPE_df.iloc[s+47:s+48], 
                  RMSE_df.iloc[s+47:s+48]]

        results = pd.concat(frames, keys=["MAE", "MAPE", "RMSE"])
        return results
        
        
    def naive_forecast(self, df):
        return df.iloc[-1]
    
    def seasonal_forecast_daily(self, df):
        return [df.iloc[-(i%24 + 1)] for i in range(47,-1,-1)]
    
    def seasonal_forecast_weekly(self, df):
        return df[-24*7:-24*7+48].values
    
    def seasonal_forecast_yearly(self, df):
        return df[-24*365:-24*365+48].values
    
    def random_walk_forecast(self, df):   
        drift = (df.iloc[-1]-df.iloc[0])/(365*24-1)
        return self.naive_forecast(df) + [i*drift for i in range(1,49)]
        
    def linear_regression(self, train_y, train_x, test, retrain=False):
        
        if retrain:
            self.forecaster = ForecasterAutoreg(
                regressor        = LinearRegression(),
                lags             = [1, 2, 24, 25, 48, 49, 72, 73, 96, 97, 120, 121, 144, 145, 167, 168],
                transformer_y    = StandardScaler(),
                transformer_exog = None
                )

            self.forecaster.fit(y=train_y, exog=train_x)
    
        return self.forecaster.predict(48, train_y, test)
         
            
    def reset_errors(self):
            
        self.MAE = {
            'naive_forecast':           [0 for i in range(0,51)],
            'seasonal_forecast_daily':  [0 for i in range(0,51)],
            'seasonal_forecast_weekly': [0 for i in range(0,51)],
            'seasonal_forecast_yearly': [0 for i in range(0,51)],
            'random_walk_forecast':     [0 for i in range(0,51)],
            'linear_regression':        [0 for i in range(0,51)]}

        self.MAPE = {
            'naive_forecast':           [0 for i in range(0,51)],
            'seasonal_forecast_daily':  [0 for i in range(0,51)],
            'seasonal_forecast_weekly': [0 for i in range(0,51)],
            'seasonal_forecast_yearly': [0 for i in range(0,51)],
            'random_walk_forecast':     [0 for i in range(0,51)],
            'linear_regression':        [0 for i in range(0,51)]}

        self.RMSE = {
            'naive_forecast':           [0 for i in range(0,51)],
            'seasonal_forecast_daily':  [0 for i in range(0,51)],
            'seasonal_forecast_weekly': [0 for i in range(0,51)],
            'seasonal_forecast_yearly': [0 for i in range(0,51)],
            'random_walk_forecast':     [0 for i in range(0,51)],
            'linear_regression':        [0 for i in range(0,51)]}
        
        
    def read_file(self, filename):
        # Initialize an empty list to store the words
        word_list = []
        # Open the file and read its contents line by line
        with open(filename, 'r') as file:
            for line in file:
                # Remove leading and trailing whitespace and append the word to the list
                word_list.append(line.strip())
        return word_list
    
    
    def save_errors(self, filename):
        self.get_errors().to_csv(filename, encoding='utf-8', index=False, header=True)
        print("Errors saved successfully")