In [1]:
import os
import glob
import warnings

import holidays
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from pandas import DataFrame
from scipy.stats import pearsonr
from sklearn.linear_model import LinearRegression as LR
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_percentage_error
from sklearn.neural_network import MLPRegressor as MLP
from sklearn.model_selection import GridSearchCV


In [3]:
# set random seed, change this value as you prefer
np.random.seed(391)


In [2]:
def predict(data_dir,
            out_dir,
            settings_dict={},
            start_time="2016-01-01 00:00:00",
            end_time="2019-12-31 23:00:00",
            split_time="2018-12-31 23:00:00",
            batch_run=True,
            target_ba_list=None,
            generate_plots=True):
    """Convenience wrapper for the Process class which runs predictive models for each BA input CSV in the input
    directory and creates a summary and comparative figures of R2 and MAPE per BA.

    :param data_dir:                Full path to the directory containing the target CSV files
    :type data_dir:                 str

    :param out_dir:                 Full path to the directory where the outputs are to be written
    :type out_dir:                  str

    :param start_time:              Start-time for analysis, in YYY-MM-DD HH:MM:SS. Default: 2016-01-01 00:00:00
    :type start_time:               str

    :param end_time:                End time for analysis. Default: 2019-12-31 23:00:00
    :type end_time:                 str

    :param split_time:              Timestamp splitting train and test data. Default: 2018-12-31 23:00:00
    :type split_time:               str

    :param batch_run:               Indicating if we want to run the simulations for all BAs, or we handpick the BAs
                                    If batch_run = True, the code will search for all BAs in 'dir'
                                    If batch_run = False, we need to specify which BA to run
    :type batch_run:                bool

    :param target_ba_list:          A list of BA names to run if `batch_run` is False
    :type target_ba_list:           list


    :param generate_plots:          Choice to generate and save plots
    :type generate_plots:           bool

    :return:                        Data frame of BA, R2, MAPE statistics

    """

    proc = Process(
        settings_dict=settings_dict,
        start_time=start_time,
        end_time=end_time,
        split_time=split_time,
        batch_run=batch_run,
        data_dir=data_dir,
        out_dir=out_dir,
        target_ba_list=target_ba_list,
        generate_plots=generate_plots
    )

    return proc.summary_df


In [None]:
def run_linear_model(X: np.ndarray, Y: np.ndarray, X_e: np.ndarray):
    """Training and test data of a linear model. Can be used for either the main model or the residual model.

    :param X:                       Training features
    :type X:                        np.ndarray

    :param Y:                       Training targets
    :type Y:                        np.ndarray

    :param X_e:                     Test features
    :type X_e:                      np.ndarray

    :return                         [0] y_p: predictions over test set
                                    [1] reg.coef_: regression coefficients of a linear model

    """

    # instantiate the linear model
    linear_mod = LR()
    
    # fit the model
    reg = linear_mod.fit(X, Y)

    # get predictions using test features
    y_p = reg.predict(X_e)

    return y_p, reg.coef_


In [None]:
settings_dict = {
    "mlp_hidden_layer_sizes": 256,
    "mlp_max_iter": 500,
    "mlp_validation_fraction": 0.1,
    "mlp_linear_adjustment": True,
    "data_column_rename_dict": {
            "Adjusted_Demand_MWh": "Demand",
            "Pop": "Population",
            "T2": "Temperature",
            "SWDOWN": "Shortwave_Radiation",
            "GLW": "Longwave_Radiation",
            "WSPD": "Wind_Speed",
            "Q2": "Specific_Humidity"
    },
    "expected_datetime_columns": ["Day", "Month", "Year", "Hour"],
    "hour_field_name": "Hour",
    "month_field_name": "Month",
    "x_variables": ["Hour", "Month", "Temperature", "Specific_Humidity", "Wind_Speed", "Longwave_Radiation", "Shortwave_Radiation"],
    "add_dayofweek_xvar": True,
    "y_variables": ["Demand"],
    
    
}

In [None]:
class Dataset:
    """Clean and format input data for use in predictive models.
    
    :param region:                  Indicating region / balancing authority we want to train and test on. 
                                    Must match with string in CSV files.
    :type region:                   str 
    
    :param data_dir:                Full path to the directory that houses the input CSV files.
    :type data_dir:                 str 
    
    :param settings_dict:           Dictionary of settings provided from either defaults or the user
    :type settings_dict:            dict 
    
    """
    
    def __init__(self,
                region: str,
                data_dir: str,
                settings_dict: dict,
                **kwargs):      
        
        self.region = region
        self.data_dir = data_dir
        self.start_time = start_time
        self.end_time = end_time
        
        # get argument defaults or custom settings
        self.expected_datetime_columns = settings_dict.get("expected_datetime_columns")
        self.data_column_rename_dict = settings_dict.get("data_column_rename_dict")
        self.x_variables = settings_dict.get("x_variables")
        self.y_variables = settings_dict.get("y_variables")
        self.mlp_linear_adjustment = settings_dict.get("mlp_linear_adjustment")
        self.hour_field_name = settings_dict.get("hour_field_name")
        self.month_field_name = settings_dict.get("month_field_name")
        self.verbose = settings_dict.get("verbose")
        
        
    def fetch_file(self):
        """Get the input file from the data directory matching the region name."""
        
        file_pattern = os.path.join(self.data_dir, f"{self.region}_*.csv")
        
        # get file list from the data directory using the pattern
        file_list = glob.glob(file_pattern)
        
        # raise error if no files are found
        if len(file_list) == 0:
            msg = f"No data files were found for region '{self.region}' in directory '{self.data_dir}'.
            raise FileNotFoundError(msg)
            
        # raise error if more than one file was found
        if len(file_list) > 1:
            msg = f"More than one data files were found for region '{self.region}' in directory '{self.data_dir}'.
            raise ValueError(msg)
            
        # log feedback to user if desired
        if self.verbose:
            print(f"Processing file:  {file_list[0]}")
            
        # return only the file path
        return file_list[0]
    
    def format_data(self) -> pd.DataFrame:
        """Read and format the input data file.  Filter data by user provided date range and sort in 
        ascending order by the timestamp.
        
        :return:                 Formatted data frame
        
        """
        
        # get target file path
        file_path = self.fetch_file()
        
        # read into a pandas data frame
        df = pd.read_csv(file_path)
        
        # rename columns to default or user desired 
        df.rename(columns=self.data_column_rename_dict, inplace=True)
        
        # generate datetime timestamp field
        df["Datetime"] = pd.to_datetime(df[self.expected_datetime_columns])
        
        # filter by date range
        df = df.loc[(df["Datetime"] >= self.start_time) & (df["Datetime"] <= self.end_time)]
        
        # sort values by timestamp
        df.sort_values(by=["Datetime"], inplace=True)
        
        # reset and drop index
        df.reset_index(drop=True, inplace=True)
        
        return df
    
    def apply_sine_for_linear_model(self, df: pd.DataFrame) -> pd.DataFrame:
        """Apply the sine function to the hour and month fields for use in a linear model as predictive variables.
        
        """
        
        # if a linear model will be ran and an hour field is present in the data frame apply the sine function
        if (self.mlp_linear_adjustment) and (self.hour_field in df.columns):
            df[self.hour_field_name] = np.sin(df[self.hour_field_name] * np.pi / 24)            

        # if a linear model will be ran and an month field is present in the data frame apply the sine function
        if (self.mlp_linear_adjustment) and (self.hour_field in df.columns):
            df[self.month_field_name] = np.sin(df[self.month_field_name] * np.pi / 12)  
        
        return df
    
    def apply_dayofweek(self: df: pd.DataFrame) -> pd.DataFrame:
        """Add a field for weekday to the data frame."""
        
        # create an array of day of the week values from the timestamp; 0 = Monday ... 6 = Sunday
        df["Weekday"] = df["Datetime"].dt.dayofweek.values
        
        # adjust to specify weekdays as 1
        df["Weekday"] = np.where(df["Weekday"] <= 4, 1, df["Weekday"])
        
        # TODO:  start here
        
#         # specify weekday vdayofweek 0: monday, 6: sunday
#         weekday = np.zeros_like(dayofweek)
        
#         weekday[dayofweek <= 4] = 1
#         df["Weekday"] = weekday

#         # Create a day of week variable
#         day_of_week = np.zeros((dayofweek.shape[0], 7))
#         for d in range(7):
#             tmp_val = np.zeros_like(dayofweek)
#             tmp_val[dayofweek == d] = 1
#             day_of_week[:, d] = tmp_val

#         # concat day of week with df
#         df_dayofweek = pd.DataFrame(day_of_week)

#         # week list
#         day_list = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
#         df_dayofweek.columns = day_list
#         df = pd.concat((df, df_dayofweek), axis=1)

#         # federal holidays added as a new feature
#         df = self.remove_federal_holidays(df=df)
        
        
        
        
        
        

In [27]:
df = pd.DataFrame({'day': ["28","29"], 'month': ["1","1"], 'year': ['2022', '2022']})

pd.to_datetime(df[['day', 'month', 'year', 'hour']])

KeyError: "['hour'] not in index"

In [22]:
df.rename(columns={'c': "ab", "b": "BB"}, inplace=True)

df

Unnamed: 0,a,BB
0,1,1
1,2,2
2,3,3


In [None]:
def run_mlp_model(X: np.ndarray, 
                  Y: np.ndarray, 
                  X_e: np.ndarray, 
                  settings_dict: dict,
                  **kwargs) -> np.ndarray:
    """Trains the MLP model. also calls the linear residual model to adjust for population correction.

    :param X:                       Training features
    :type X:                        np.ndarray

    :param Y:                       Training targets
    :type Y:                        np.ndarray

    :param X_e:                     Test features
    :type X_e:                      np.ndarray
    
    :param settings_dict:           Dictionary of settings provided from either defaults or the user
    :type settings_dict:            dict 

    :return:                        y_p: np.ndarray -> predictions over test set

    """

    # get argument defaults or custom settings
    mlp_hidden_layer_sizes = settings_dict.get("mlp_hidden_layer_sizes")
    mlp_max_iter = settings_dict.get("mlp_max_iter")
    mlp_validation_fraction = settings_dict.get("mlp_validation_fraction")
    mlp_linear_adjustment = settings_dict.get("mlp_linear_adjustment")

    # instantiate the MLP model
    mlp = MLP(hidden_layer_sizes=mlp_hidden_layer_sizes, 
              max_iter=mlp_max_iter, 
              validation_fraction=mlp_validation_fraction)

    # fit the model to data matrix X (training features) and target Y (training targets)
    mlp.fit(X, Y)

    # predict using the multi-layer perceptron model using the test features
    y_p = mlp.predict(X_e)
    
    # if the user desired, adjust the prediction using a linear residual model
    if mlp_linear_adjustment:
        
        # predict on training features
        y_tmp = mlp.predict(X)
        
        # compute the residuals in the training data
        epsilon = Y - y_tmp
        
        # prepare data for the residual model
        # TODO: produce Xres_t, Xres_e

        # train the linear model to find residuals
        epsilon_e, regression_coeff = run_linear_model(X=Xres_t , Y=epsilon, X_e=Xres_e)
        
        # apply the adjustment
        y_p += epsilon_e
        
    return y_p
        

        

In [16]:
def test(x, y, **kwargs):
    
    print(x, y, kwargs)
    
    settings_dict = kwargs.get('settings_dict')
    
    print(settings_dict.get('hidden_layer_sizes'))
    

In [17]:
test(1, 2, settings_dict={'hidden_layer_sizes': 256, 'max_iter': 500})

1 2 {'settings_dict': {'hidden_layer_sizes': 256, 'max_iter': 500}}
256
