In [None]:
# Import libraries
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

from joblib import Parallel, delayed

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

import statsmodels.api as sm
import statsmodels.formula.api as smf
import statsmodels.stats.diagnostic as dg
import statsmodels.stats.stattools as st

import scipy as sp

In [None]:
# SparkSession
spark = SparkSession.builder.getOrCreate()

### Functions

In [None]:
def ITS_model_naive(data: pd.DataFrame,
                    formula: str) -> pd.DataFrame:
    '''
    Main focus: Implementing naive (OLS) regression model, assessing coefficients, significance, and checking CLRM assumptions
    Approach: Utilize joblib's Parallel and delayed functions to concurrently process each location's time series data, applying OLS regression, and aggregate the key statistical metrics into a DataFrame
    '''
    # Create an empty DataFrame to store all statistical metrics for all location_id 
    results_df = pd.DataFrame()

    # Define inner function to apply OLS regression and get statistical metrics
    def process_location(location_id: str,
                         group: pd.DataFrame) -> pd.DataFrame:
        
        # Define and fit naive OLS regression model
        naive = smf.ols(formula = formula, data=group).fit()
        
        # Create statistical metrics dataframe
        results = pd.DataFrame([{
            'location_id': location_id,
            'coef': naive.params, # Linear coeff (aka betas) that min least square criterion
            'pvalues': naive.pvalues, # two-tailed p-values for t-stats of params
            #'Durbin_Watson': st.durbin_watson(naive.resid), # Durbin-Watson statistic: [0;4] --> if ~0 then positive serial correlation (autocorrelation), otherwise (~4) then negative serial correlation (autocorrelation)
            'BreuschGodfrey': dg.acorr_breusch_godfrey(res=naive, nlags=7)[3], # Breusch-Godfrey test (testing serial correlation): check persistance over weeks (daily observations)
            'HeteroscedasticityARCH': dg.het_arch(resid=naive.resid, nlags=7)[3], # Heteroscedasticity ARCH test (testing heteroscedasticity): check persistence over weeks (daily observations)
            'JarqueBera': st.jarque_bera(naive.resid)[1] # Jarque-Bera test (testing normality distribution of errors)
        }])
        
        # Return statistical metrics for current location
        return results
    
    # Parallelize the processing of each location
    processed_results = Parallel(n_jobs=-1)(
        delayed(process_location)(location_id, group)
        for location_id, group in data.groupby('location_id')
    )
    
    # Concatenate results from parallel processing
    results_df = pd.concat(processed_results, ignore_index=True)
    
    # Return the statistical metrics for each locaion_id
    return results_df

In [None]:
def add_new_col(d_temp: pd.DataFrame) -> None:
    '''
    Main focus: Incorporating new columns related to interventions and days
    Approach: Utilizing the date and intervention columns, create intervention2 to indicate intervention occurrences, days to represent the cumulative days, and intervention_day to accumulate intervention days
    '''
    # Return existing DataFrame with three new col
    return d_temp.assign(intervention2 = lambda d: (d['date'] >= d['intervention']).astype(int),
                         days = np.arange(len(d_temp)) +1,
                         intervention_day = lambda d: d['intervention2'].cumsum())

### Interrupted Time Series model

In [None]:
# Load jersey data
table_name = 'eliqdatalake.playground.jersey_data'
query = f"SELECT * FROM {table_name}"
data = spark.sql(query).toPandas().sort_values(['location_id', 'date']).reset_index(drop=True)

In [None]:
# TODO: Go back to preprocessing phase and add condition to check the presence of zero-values --> ex 1840990 ==> DONE
d = [loc_id for loc_id, group in data.groupby('location_id') if (group['energy']==0).sum()/len(group) >= 0.2]
for location_id in d:
    data.drop(data.query(f'location_id=={location_id}').index, inplace=True)
data.reset_index(drop=True, inplace=True)

#### Naive (OLS) linear regression model

**Causal Inference problem**: Figuring out the effect of the usage of Eliq’s solution on energy consumption - Do Eliq’s energy management solutions cause changes (decrease) in energy consumptions?

**Goal**: Eliq's energy management solutions decrease energy consumption.

**Model**: (Naive OLS) linear regression model

**Estimation method**: Ordinary Least Squares (OLS) --> Minimize the sum of squared errors by using residual sum of squares (RSS)

**Regression formula**: Look at the The event book, chapter 17 (https://theeffectbook.net/ch-EventStudies.html#event-studies-with-regression)

**Requirement before fitting the model** : stationarity

**Statistical metrics**: Checking requirements after fitting the model, ie the goodness of results:
* *Breusch-Godfrey test*: testing for serial correlation (autocorrelation); hypothesis test:\
H_0: No correlation in the errors of the model\
H_1: Correlation in the errors of the model\
NB. If rejecting H_0, then bad idea to use OLS as estimation method because it violates the CLRM assumption
* *Heteroscedasticity ARCH test*: testing for homoscedasticity (ie presence of volatility); hypothesis test:\
H_0: No dependence in the variance of the error at time t with the error at time t-1, t-2,...,t-n (homoscedasticity)\
H_1: Dependence in the variance of the error at time t with the error at time t-1, t-2,...,t-n (heteroscedasticity)\
NB. If rejecting H_0, then there are two solutions (1) GLS (theoretical solution for panel data) and (2) GARCH model (where heteroscedasticity arises in the presence of volatility in the shock)
* *Jarque-Bera test*: testing normality distributions of errors; hypothesis test:\
H_0: Normality\
H_1: Non-normality

In [None]:
data = data.groupby('location_id').apply(add_new_col)
data

To preserve the previous behavior, use

	>>> .groupby(..., group_keys=False)


	>>> .groupby(..., group_keys=True)
  data = data.groupby('location_id').apply(add_new_col)


Unnamed: 0,location_id,date,intervention,energy,temp,cloudcover,humidity,precip,windspeed,brightness_s2s,energyprice,energyprice_percChange,energyprice_change,energy_diff,temp_diff,cloudcover_diff,humidity_diff,precip_diff,windspeed_diff,brightness_s2s_diff,energyprice_diff,energyprice_percChange_diff,energyprice_change_diff,intervention2,days,intervention_day
0,1420037,2020-09-20,2021-09-20,9429.0,18.6,17.0,86.0,0.0,12.2,0.41,43.71,0.000,0.00,9429.0,18.6,17.0,86.0,0.0,12.2,0.41,43.71,0.000,0.00,0,1,0
1,1420037,2020-09-21,2021-09-20,10173.0,18.9,11.0,84.0,0.0,12.5,0.41,43.71,0.000,0.00,744.0,0.3,-6.0,-2.0,0.0,0.3,0.00,0.00,0.000,0.00,0,2,0
2,1420037,2020-09-22,2021-09-20,7932.0,17.1,42.0,93.0,0.0,13.5,0.40,43.71,0.000,0.00,-2241.0,-1.8,31.0,9.0,0.0,1.0,-0.01,0.00,0.000,0.00,0,3,0
3,1420037,2020-09-23,2021-09-20,7901.0,15.3,38.0,84.0,0.1,28.0,0.40,43.71,1.580,0.68,-31.0,-1.8,-4.0,-9.0,0.1,14.5,0.00,0.00,1.580,0.68,0,4,0
4,1420037,2020-09-24,2021-09-20,8638.0,12.5,36.0,77.0,0.2,40.4,0.40,42.96,-1.716,-0.75,737.0,-2.8,-2.0,-7.0,0.1,12.4,0.00,-0.75,-3.296,-1.43,0,5,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3663036,6404010,2023-04-22,2022-04-26,15038.0,10.3,52.0,86.0,0.0,18.0,0.52,98.76,0.000,0.00,7803.0,1.4,14.0,9.0,0.0,3.6,0.00,0.00,0.373,0.37,1,727,362
3663037,6404010,2023-04-23,2022-04-26,19145.0,10.1,69.0,92.0,0.0,23.4,0.52,98.76,0.000,0.00,4107.0,-0.2,17.0,6.0,0.0,5.4,0.00,0.00,0.000,0.00,1,728,363
3663038,6404010,2023-04-24,2022-04-26,8908.0,9.9,47.0,85.0,0.0,22.8,0.53,97.84,-0.932,-0.92,-10237.0,-0.2,-22.0,-7.0,0.0,-0.6,0.01,-0.92,-0.932,-0.92,1,729,364
3663039,6404010,2023-04-25,2022-04-26,9144.0,8.8,46.0,75.0,0.0,21.2,0.53,97.92,0.082,0.08,236.0,-1.1,-1.0,-10.0,0.0,-1.6,0.00,0.08,1.014,1.00,1,730,365


In [None]:
data.drop(['date',
         'intervention',
         'energy',
         'temp',
         'cloudcover',
         'humidity',
         'precip',
         'windspeed',
         'brightness_s2s',
         'energyprice',
         'energyprice_percChange',
         'energyprice_change',
         'temp_diff',
         'cloudcover_diff',
         'humidity_diff',
         'precip_diff',
         'windspeed_diff',
         'brightness_s2s_diff',
         'energyprice_diff',
         'energyprice_percChange_diff',
         'energyprice_change_diff'], axis=1, inplace=True)
data

Unnamed: 0,location_id,energy_diff,intervention2,days,intervention_day
0,1420037,9429.0,0,1,0
1,1420037,744.0,0,2,0
2,1420037,-2241.0,0,3,0
3,1420037,-31.0,0,4,0
4,1420037,737.0,0,5,0
...,...,...,...,...,...
3663036,6404010,7803.0,1,727,362
3663037,6404010,4107.0,1,728,363
3663038,6404010,-10237.0,1,729,364
3663039,6404010,236.0,1,730,365


In [None]:
#data = data.reset_index(drop=True)

In [None]:
new_res = ITS_model_naive(data, formula='energy_diff ~ intervention2 + days + intervention_day')
new_res

Unnamed: 0,location_id,coef,pvalues,BreuschGodfrey,HeteroscedasticityARCH,JarqueBera
0,1420037,Intercept 76.187701 intervention2 ...,Intercept 0.756855 intervention2 ...,3.541880e-62,3.542960e-17,1.798166e-08
1,1420043,Intercept 109.839591 intervention2 ...,Intercept 0.797421 intervention2 ...,1.541391e-61,7.218846e-14,4.924716e-07
2,1420059,Intercept 161.392835 intervention2 ...,Intercept 0.575253 intervention2 ...,7.651353e-64,3.554185e-08,4.045508e-03
3,1420066,Intercept 120.199037 intervention2 ...,Intercept 0.584865 intervention2 ...,1.853441e-60,4.472069e-20,2.476077e-129
4,1420072,Intercept 302.617296 intervention2 ...,Intercept 0.572943 intervention2 ...,3.477909e-15,3.799656e-13,7.851131e-23
...,...,...,...,...,...,...
5006,5481719,Intercept 7.963326 intervention2 ...,Intercept 0.728519 intervention2 ...,3.857263e-71,9.798466e-85,0.000000e+00
5007,5651511,Intercept 62.837769 intervention2 ...,Intercept 0.836161 intervention2 ...,8.325333e-44,6.661834e-03,3.521022e-48
5008,6061367,Intercept 257.401114 intervention2 ...,Intercept 0.617958 intervention2 ...,8.638287e-42,2.141357e-21,2.078568e-186
5009,6377801,Intercept 193.052807 intervention2 ...,Intercept 0.784180 intervention2 ...,3.363103e-66,2.272012e-14,5.063999e-08


In [None]:
# Check goodness of results and statistical metrics for naive OLS models
for location_id, group in new_res.groupby('location_id'):
    # Get coefficient for intervention2 --> estimates of energy consumption effect in percentage
    coeff = round(group['coef'].iloc[0]['intervention2'], 2)
    # Get pvalues for intervention2 --> testing statistical significance of coeff
    pvalue = round(group['pvalues'].iloc[0]['intervention2'], 2)
    # Get p-value of Breusch-Godfrey test
    bg = round(group['BreuschGodfrey'].iloc[0], 2)
    # Get p-value of Heteroscedasticity ARCH test
    h = round(group['HeteroscedasticityARCH'].iloc[0], 2)
    # Get p-value of Jarque-Bera test
    jb = round(group['JarqueBera'].iloc[0], 2)
    # Checking condition for hypothesis tests
    if bg > 0.05 or h > 0.05 or jb > 0.05:
        if pvalue < 0.05:
            print(f"{location_id}: coefficient for intervention2 is {coeff}, its pvalue is {pvalue}, Breusch-Godfrey is {bg}, Heteroscedasticity ARCH is {h} and Jarque-Bera is {jb}")

**Conclusion naive OLS linear regression model**

After testing all locations for serial correlation, heteroscedasticity, and normality distributions of errors, it is evident that OLS is not an appropriate estimation method in our case. The reasons are as follows:
* *Breusch-Godfrey test*: For all the locations, we reject H_0, indicating the presence of serial correlation (autocorrelation) in the errors of the models. This violates CLRM assumption (3): Cov(ui, uj) = 0 
* *Heteroscedasticity ARCH test*: For all locations, we rejet H_0, suggesting dependence in the variance of the error at time t with the error at time t-1, t-2,..., t-n (heteroscedasticity). This violates CLRM (2): Var(ut) = σ^2 < ∞
* *Jarque-Bera test*: For most of the locations, we reject H_0, indicating that the residuals are not normally distributed, thus are still not known. This violates CLRM (5): ut ∼ N (0, σ^2)

*Consequences* of these violations are that (i) the coefficient estimates are wrong, (ii) the associated standard errors are wrong and (iii) the distribution that we assumed for the test statistics will be inappropriate.

*Potential solutions*:
1. Given serial correlation and heteroscedasticity, two potential solutions are:\
  a. GLS (Generalised least squares) --> Only theoretical solution\
  b. GARCH model --> Primarily designed for financial time series data, it captures autocorrelation and heteroscedasticity in residuals. However, it is not tested due to being outside the project scope
2. Bayesian Structural Time Series --> A paradigm shift employing a Bayesian estimation method

Why not try ARIMA? 
* ARIMA model only consider time-varying mean and constant conditional variance, thus failing to capture time-varying volatility. In other words, it cannot model heteroscedasticity.

In summary, OLS as estimation method for naive linear regression model is wrong since it violates CLRM assumptions; attempting to test predicitons using such model would be misguided, given its inherent violantions of the assumptions. Both ARIMA and GARCH models employ MLE as estimation method but they are not suitable for our task. The former cannot model heteroscedasticity while the latter falls beyond the project scope. Thus, the shift from (naive) linear models to Bayesian structural framework is justified by not only potential poor prediction resuts but mainly because the former fails to support its underlying assumptions.