# The preprocessing pipeline

### The goal of the preprocessing pipeline

The data preprocessing pipeline that is presented in this chapter assumes that energy consumption and outdoor air temperature is the only data that is available for M&V. We consider an energy consumption and outdoor air temperature dataset validated if:
    
    
1. **There are no duplicate values in the dataset’s timestamps**. Duplicate timestamps are treated separately for energy consumption and for temperature data. In both cases, if the range of the energy consumption or temperature values that share a timestamp is short – according to a user-defined threshold– they are replaced by their average. Otherwise, they are treated as missing values.


2. **There are no missing values in the dataset’s timestamps**. If there are missing timestamps, they are added and the respective data is treated as missing values.


3. **Potential outliers are identified and marked**. Outlier detection is carried out separately for energy consumption and for temperature data. 


4. **There is enough data available for the energy consumption of the building under study**. Baseline energy consumption data must cover at least one full year before any energy efficiency intervention. In addition, and adopting the data requirements of the [CalTRACK](https://www.caltrack.org/) set of methods, data must be available for over 90% of hours in each calendar month – ***after excluding the potential outliers***.


5. **There are no missing values in the outdoor air temperature data**. If temperature data is missing, the missing values are imputed. The outdoor air temperature changes smoothly from one hour to the next, so interpolating over a 6-hour window around a missing observation is a sensible approach for imputation. This is in line with CalTRACK's requirement that temperature data may not be missing for more than six (6) consecutive hours.


6. **There are no missing values in the energy consumption data**. Missing values for energy consumption data do not pose a problem when training the predictive baseline model but they may lead to daily consumption profiles that are mistakenly regarded as unusual when we search for common and uncommon patterns in the data. Accordingly, the proposed workflow imputes the energy consumption data for the identification of patterns in the daily consumption, but ***does not include*** the imputed values in the dataset that is used for the predictive model’s training.

The steps of the preprocessing pipeline are summarized in the figure below:

<img src="html/figures/01.png" width="500"/>


In [1]:
%matplotlib inline
%load_ext autoreload
%autoreload 2

In [2]:
import os
import numpy as np
import pandas as pd
import seaborn as sns
import scipy.stats as stats
import matplotlib.pyplot as plt

from datetime import date, datetime, timedelta

pd.plotting.register_matplotlib_converters()

In [3]:
from eensight.io import DataCatalog
from eensight.utils import load_configuration, fit_pdf
from eensight.prediction.linear_models import seasonal_predict

from eensight.preprocessing import linear_impute, iterative_impute
from eensight.preprocessing import validate_data, check_column_values_not_null 
from eensight.preprocessing import global_filter, global_outlier_detect, local_outlier_detect


### Load the data catalog for Demo Building #1

In [4]:
cfg = load_configuration(catalog='demo_site_01')

In [7]:
catalog = DataCatalog.from_config(catalog=cfg.catalog, 
                                  data_dir=cfg.data_dir, 
                                  ml_stages='train')

### Select the consumption dataset from the demo catalog

In [9]:
consumption = catalog.load('consumption_train')

### Make sure that the data is in the appropriate format

In [10]:
consumption = validate_data(consumption, col_name='consumption', date_col_name='timestamp')

In [None]:
with plt.style.context('seaborn-whitegrid'):    
    fig = plt.figure(figsize=(12, 3.54), dpi=96)
    layout = (1, 1)
    ax = plt.subplot2grid(layout, (0, 0))

    consumption['consumption'].plot(ax=ax, alpha=0.5)
    ax.set_xlabel('Hours')

### Outlier identification

The proposed approach for outlier identification is outlined next:

#### Step 1: Global filter

The first step screens for non-physically plausible values as well as unlikely values in the data. 

For power consumption data, negative and zero values are filtered out. 

For both consumption and temperature data, values that are at least 10 times larger than the median value are also removed. The threshold of ten times the median value aims at removing the most extreme outliers. 

Furthermore, long streaks of constant values are filtered out as well (here *long* is defined in hours by `no_change_window`).


In [12]:
consumption['consumption'] = global_filter(consumption['consumption'], 
                                           no_change_window=3,
                                           allow_zero=False, 
                                           allow_negative=False
)

#### Step 2: Seasonal filter

The second step captures the seasonal cycle of the data through a trend and seasonality decomposition approach that utilizes a Fourier series expansion of the form:

$$y(t)=\alpha+bt+\sum_{n=1}^{N} (\alpha_n\cos(\frac{2πnt}{P}) + b_n\sin(\frac{2πnt}{P}))$$

where:

$\alpha$    is the offset of the linear trend

$b$    is the slope of the linear trend

$t$    is the day since a pre-specified epoch. For hourly data, $t$ will take decimal number values.

$N$    is a parameter that controls the flexibility of the expansion. Suggested values are N=4 for daily seasonality, N=10 for yearly seasonality (see [Taylor S. J. and Letham B. (2018) "Forecasting at scale," The American Statistician 72(1), pp. 37-45](https://peerj.com/preprints/3190/))

$P$    is the length of the seasonality: P=1 for daily seasonality, P=365.25 for yearly seasonality. For energy consumption data, we fit a different daily seasonality component for each day of the week.  

$\alpha_n, b_n$	Regression coefficients for the Fourier series expansion terms.


The reason for applying seasonal decomposition before outlier identification can be seen in the figure below: 

In [None]:
consumption_ = consumption['consumption'].dropna().values
x_d = np.linspace(consumption_.min(), consumption_.max(), 2000)
params, pdf = fit_pdf(x_d, consumption_)

with plt.style.context('seaborn-whitegrid'):    
    fig = plt.figure(figsize=(12, 3.54), dpi=96)
    layout = (1, 1)
    ax = plt.subplot2grid(layout, (0, 0))
    
    consumption['consumption'].plot(kind='hist', bins=100, density=True, alpha=0.3, ax=ax)
    pd.Series(pdf, x_d).plot(ax=ax)
    
    ax.legend(['Fitted Normal distribution', 'Actual distribution of power consumption'], 
              frameon=True, shadow=True, fontsize=12)

Since seasonality leads to multimodal distributions, methods that rely on the assumption that the data follows a Normal distribution – such as simple three-sigma rules, the Grubbs test or the Extreme Studentized Deviate (ESD) test  – should generally be used only ***after*** a seasonal filter has been applied to the data.

In [14]:
result = seasonal_predict(consumption, target_name='consumption')

The next plot shows the actual and the predicted power consumption for the first and the last month of 2016:

In [None]:
with plt.style.context('seaborn-whitegrid'):    
    fig = plt.figure(figsize=(12, 7), dpi=96)
    layout = (2, 1)
    ax1 = plt.subplot2grid(layout, (0, 0))
    ax2 = plt.subplot2grid(layout, (1, 0))

    start = datetime(2016, 1, 1, 0)
    end = datetime(2016, 2, 1, 0)
    consumption['consumption'].loc[start:end].plot(ax=ax1, alpha=0.6)
    result.predicted.loc[start:end].plot(ax=ax1, alpha=0.4)
    ax1.set_xlabel('Hours')
    ax1.legend(['Power consumption', 'Seasonal prediction'], frameon=True, shadow=True)
    
    start = datetime(2016, 12, 1, 0)
    end = datetime(2017, 1, 1, 0)
    consumption['consumption'].loc[start:end].plot(ax=ax2, alpha=0.6)
    result.predicted.loc[start:end].plot(ax=ax2, alpha=0.4)
    ax2.set_xlabel('Hours')
    ax2.legend(['Power consumption', 'Seasonal prediction'], frameon=True, shadow=True)

fig.tight_layout()

An overall sense for the quality of the fit can be gained by plotting a scatter plot of actual and predicted values:

In [None]:
with plt.style.context('seaborn-whitegrid'):    
    fig = plt.figure(figsize=(12, 3.54), dpi=96)
    layout = (1, 1)
    ax = plt.subplot2grid(layout, (0, 0))
    
    sns.regplot(x=result.predicted, y=consumption['consumption'], ax=ax, 
                scatter_kws=dict(alpha=0.01))
    
    ax.set_xlabel('Predicted consumption')
    ax.set_ylabel('Actual consumption')

The next plot shows the distribution of the residuals when subtracting the actual from the predicted power consumption. The distribution of the residuals resembles a Student’s t distribution and, hence, it is easier to work with for detecting outliers.

In [None]:
residuals_ = result.resid.dropna()
x_d = np.linspace(residuals_.min(), residuals_.max(), 2000)

_, pdf_normal = fit_pdf(x_d, residuals_)
_, pdf_t = fit_pdf(x_d, residuals_, distribution=stats.t)


with plt.style.context('seaborn-whitegrid'):    
    fig = plt.figure(figsize=(12, 3.54), dpi=96)
    layout = (1, 1)
    ax = plt.subplot2grid(layout, (0, 0))
    
    result.resid.plot(kind='hist', bins=100, density=True, alpha=0.3, ax=ax)
    pd.Series(pdf_normal, x_d).plot(ax=ax)
    pd.Series(pdf_t, x_d).plot(ax=ax)
    
    ax.legend(['Fitted Normal distribution', 'Fitted Student\'s t distribution',
               'Distribution of residuals'], 
              frameon=True, shadow=True, fontsize=12)

#### Step 3: Global outlier detection

The third step of the outlier detection process identifies observations in the available dataset as potential outliers if the value of their corresponding residuals lies outside the range defined by:

$$[mean^{all} - c\times scale^{all}, mean^{all} + c\times scale^{all}]$$

where:

$mean^{all}$ is the mean of a Student’s t distribution fitted on all the residual values

$scale^{all}$ is the scale of a Student’s t distribution fitted on all the residual values

$c$ is a user defined parameter (suggested value is 4).


In [18]:
outliers_global = global_outlier_detect(result.resid, c=4)

The next plot shows the potential outliers in power consumption identified using the global outlier detection for January and December 2016:

In [None]:
subset = consumption.loc[consumption.index.isin(outliers_global[outliers_global].index), 
                         'consumption']

with plt.style.context('seaborn-whitegrid'):    
    fig = plt.figure(figsize=(12, 6), dpi=96)
    layout = (2, 1)
    ax1 = plt.subplot2grid(layout, (0, 0))
    ax2 = plt.subplot2grid(layout, (1, 0))
    
    start = datetime(2016, 1, 1, 0)
    end = datetime(2016, 2, 1, 0)
    consumption['consumption'].loc[start:end].plot(ax=ax1, alpha=0.6)
    subset.loc[start:end].plot(ax=ax1, style='o', ms=4, c='red', alpha=0.4)
    ax1.set_xlabel('Hours')
    ax1.legend(['Power consumption', 'Potential global outliers'], frameon=True, shadow=True)
    
    start = datetime(2016, 12, 1, 0)
    end = datetime(2017, 1, 1, 0)
    consumption['consumption'].loc[start:end].plot(ax=ax2, alpha=0.6)
    subset.loc[start:end].plot(ax=ax2, style='o', ms=4, c='red', alpha=0.4)
    ax2.set_xlabel('Hours')
    ax2.legend(['Power consumption', 'Potential global outliers'], frameon=True, shadow=True)
    
fig.tight_layout()

#### Step 4: Local outlier detection

The final step of the outlier detection process retains from the outliers identified in the previous step only those that can be characterised as outliers when we also compare their values with the observations in the same day of the year. 

The rationale for this approach can be explained by looking at the next plot, which shows the actual and the predicted power consumption during the first two (2) weeks of 2016 in the dataset. An important observation from the plot is that the distance from the seasonal model’s predictions is not by itself enough for detecting outliers when the whole day is misrepresented by the model (here a holiday is treated as a normal day).

In [None]:
start = datetime(2016, 1, 1, 0)
end = datetime(2016, 1, 1, 0) + timedelta(days=14)

with plt.style.context('seaborn-whitegrid'):    
    fig = plt.figure(figsize=(12, 3.54), dpi=96)
    layout = (1, 1)
    ax = plt.subplot2grid(layout, (0, 0))

    consumption['consumption'].loc[start:end].plot(ax=ax, alpha=0.8)
    result.predicted.loc[start:end].plot(ax=ax, alpha=0.4)
    
    ax.set_ylim(top=7000)
    ax.annotate(' First day of year ', xy=(datetime(2016, 1, 1, 12), 2200),  xycoords='data',
             xytext=(40, 140), textcoords='offset points',
             size=13, ha='center', va="center",
             bbox=dict(boxstyle="round", alpha=0.3),
             arrowprops=dict(arrowstyle="wedge,tail_width=0.5", alpha=0.3))
    
    ax.set_xlabel('Hours')
    ax.legend(['Power consumption', 'Seasonal prediction'], frameon=True, shadow=True)

Accordingly, the observations in the available dataset are marked as potential outliers if the value of their corresponding residuals lies outside the range defined by:

$$[median^{day} - c\times mad^{day}, median^{day} + c\times mad^{day}]$$

where:

$median^{day}$ is the median of all the residual values in the corresponding day

$mad^{day}$ is the median absolute deviation of all the residual values in the corresponding day

$c$ is a user defined parameter (suggested value is 4).


This step is parameterised by the minimum percentage of observations `min_samples` that must be available for any given day so that to take the daily statistics into account. If the number of the available observations is lower than this threshold, only the global outlier detection results are considered.

In [19]:
outliers_local = local_outlier_detect(result.resid, min_samples=0.6, c=4)

The next plot shows the potential outliers in power consumption identified using the local outlier detection for January and December 2016:

In [None]:
subset = consumption.loc[consumption.index.isin(outliers_local[outliers_local].index), 
                         'consumption']

with plt.style.context('seaborn-whitegrid'):    
    fig = plt.figure(figsize=(12, 6), dpi=96)
    layout = (2, 1)
    ax1 = plt.subplot2grid(layout, (0, 0))
    ax2 = plt.subplot2grid(layout, (1, 0))
    
    start = datetime(2016, 1, 1, 0)
    end = datetime(2016, 2, 1, 0)
    consumption['consumption'].loc[start:end].plot(ax=ax1, alpha=0.6)
    subset.loc[start:end].plot(ax=ax1, style='o', ms=4, c='red', alpha=0.4)
    ax1.set_xlabel('Hours')
    ax1.legend(['Power consumption', 'Potential local outliers'], frameon=True, shadow=True)
    
    start = datetime(2016, 12, 1, 0)
    end = datetime(2017, 1, 1, 0)
    consumption['consumption'].loc[start:end].plot(ax=ax2, alpha=0.6)
    subset.loc[start:end].plot(ax=ax2, style='o', ms=4, c='red', alpha=0.4)
    ax2.set_xlabel('Hours')
    ax2.legend(['Power consumption', 'Potential local outliers'], frameon=True, shadow=True)
    
fig.tight_layout()

For an observation to be marked as an outlier, both global and local results must agree. 

In [20]:
outliers = np.logical_and(outliers_global, outliers_local)

In [None]:
with plt.style.context('seaborn-whitegrid'):    
    fig = plt.figure(figsize=(12, 3.54), dpi=96)
    layout = (1, 1)
    ax = plt.subplot2grid(layout, (0, 0))
    
    consumption['consumption'].plot(ax=ax, alpha=0.6)
    
    subset = consumption.loc[consumption.index.isin(outliers[outliers==True].index), 
                             'consumption']
    subset.plot(ax=ax, style='o', ms=3, c='red')
    
    ax.set_xlabel('Hours')
    ax.legend(['Power consumption', 'Potential outliers'], frameon=True, shadow=True)

In [22]:
consumption['consumption'] = consumption['consumption'].mask(outliers, other=np.nan)

### Repeat the process for temperature data

In [23]:
temperature = catalog.load('temperature_train')
temperature = validate_data(temperature, col_name='temperature', date_col_name='timestamp')

In [None]:
with plt.style.context('seaborn-whitegrid'):    
    fig = plt.figure(figsize=(12, 3.54), dpi=96)
    layout = (1, 1)
    ax = plt.subplot2grid(layout, (0, 0))

    temperature['temperature'].plot(ax=ax, alpha=0.5)
    ax.set_xlabel('Hours')

In [25]:
temperature['temperature'] = global_filter(temperature['temperature'], 
                                           no_change_window=3,
                                           allow_zero=True, 
                                           allow_negative=True)

No outliers found in the dataset:

In [26]:
outliers_global = global_outlier_detect(temperature['temperature'], c=4)
outliers_global[outliers_global==True].shape

(0,)

In [27]:
outliers_local = local_outlier_detect(temperature['temperature'], c=4)
outliers_local[outliers_local==True].shape

(0,)

### Impute missing values in the temperature data

In [30]:
print('Number of missing temperature values before: {}'
          .format(temperature['temperature'].isna().sum())
)

Number of missing temperature values before: 32655


In [31]:
temperature['temperature'] = linear_impute(temperature['temperature'], window=6)

In [32]:
print('Number of missing temperature values after: {}'
          .format(temperature['temperature'].isna().sum())
)

Number of missing temperature values after: 146


### Repeat the process for holiday data (if available)

In [34]:
holidays = None

if catalog.exists('holidays_train'):
    holidays = catalog.load('holidays_train')
    holidays = validate_data(holidays, col_name='holiday', date_col_name='timestamp')

### Merge the data

In [37]:
merged_data = pd.merge_asof(consumption, temperature, left_index=True, right_index=True,
                     direction='nearest', tolerance=pd.Timedelta('1H'))

if holidays is not None:
    holidays.index = holidays.index.map(lambda x: x.date)
    holidays = (merged_data.index.to_series()
                                 .map(lambda x: x.date())
                                 .map(lambda x: holidays.iloc[:,0]
                                 .get(x, default=np.nan))
                                 .to_frame('holiday')
    )
    merged_data = merged_data.join(holidays)

### Ensure that enough training data is available

In [40]:
missing = merged_data[['consumption']].mask(merged_data['temperature'].isna(), np.nan)

In [59]:
avail_data = dict()

for month_year, group in missing.groupby([lambda x: x.year, lambda x: x.month]):
    check = check_column_values_not_null(data=group, column='consumption', mostly=0.9)
    avail_data[month_year] = check.result['unexpected_percent']

avail_data = {f'{key[0]}M{key[1]}' :val for key, val in avail_data.items()}
avail_data = pd.DataFrame.from_dict(avail_data, orient='index', columns=['values'])

In [None]:
with plt.style.context('seaborn-whitegrid'):    
    fig = plt.figure(figsize=(12, 3.54), dpi=96)
    layout = (1, 1)
    ax = plt.subplot2grid(layout, (0, 0))
    
    subset = avail_data.mask(avail_data['values'] <= 0.1, 0) 
    subset.plot.bar(rot=25, ax=ax, color='#C71585', legend=False)
    
    subset = avail_data.mask(avail_data['values'] > 0.1, 0)
    subset.plot.bar(rot=25, ax=ax, color='#4682B4', legend=False)


### Impute missing values in the consumption data

In [61]:
print('Number of missing consumption values before: {}'
          .format(merged_data['consumption'].isna().sum())
)

Number of missing consumption values before: 326


In [62]:
imputed = iterative_impute(merged_data, target_name='consumption', 
                                        other_features='temperature')

In [63]:
merged_data['consumption_imputed'] = False

merged_data['consumption_imputed'] = (
        merged_data['consumption_imputed'].mask(merged_data['consumption'].isna(), other=True)
)

merged_data['consumption'] = (
        merged_data['consumption'].mask(merged_data['consumption_imputed'], other=imputed)
) 

In [64]:
print('Number of missing consumption values after: {}'
          .format(merged_data['consumption'].isna().sum())
)

Number of missing consumption values after: 0


### Save the data as an intermediate artifact

In [66]:
_ = catalog.save('merged_data_train', merged_data)

----------

### Apply this workflow step on Demo Building #2

The `eensight` package provides functionality for running the workflow step by step.

In [67]:
from eensight.workflow.steps import PreprocessStep

In [68]:
cfg = load_configuration(catalog='demo_site_02')

In [70]:
catalog = DataCatalog.from_config(catalog=cfg.catalog, 
                                  data_dir=cfg.data_dir, 
                                  ml_stages='train')

In [73]:
preprocess = PreprocessStep(catalog=catalog, 
                            parameters=cfg.parameters.preprocess, 
                            ml_stage='train', 
                            rebind=cfg.catalog.rebind_names
)

In [74]:
preprocess()

2021-05-20 14:14:01,797:INFO    : Running pre_execute
2021-05-20 14:14:01,798:INFO    : Running execute
2021-05-20 14:14:01,799:INFO    : Loading data from `consumption_train` (CSVDataSet)...
2021-05-20 14:14:03,831:INFO    : Loading data from `temperature_train` (CSVDataSet)...
2021-05-20 14:14:06,822:INFO    : Saving data to `merged_data_train` (CSVDataSet)...
2021-05-20 14:14:06,860:INFO    : Running post_execute
