# Imputation Experiment

Testing how different imputation methods perform given different NA gap sizes.
The test is then performed in parallel with Ray.

Setup

In [None]:
# Setup
import numpy as np
import pickle
import pandas as pd
from datetime import datetime
import matplotlib.pyplot as plt
import plotly.express as px
from pathlib import Path
import seaborn as sns
from sklearn.metrics import mean_absolute_error
from statsforecast import StatsForecast
from statsforecast.models import AutoETS
from prophet import Prophet
import plotly.express as px
import ray

# Multiple outputs per cell
%config InteractiveShell.ast_node_interactivity = 'all'

# Functions
def create_hour_minute(df):
    '''
    This function is supplemental to the following 'add_time_features' function,
    creating an hour_minute variable
    '''
    # Initialize empty variable
    df['hour_minute'] = None

    # Conditions
    conditions = [df['utc_timestamp'].dt.minute == 15,
                  df['utc_timestamp'].dt.minute == 30,
                  df['utc_timestamp'].dt.minute == 45]
    
    transform = [df['utc_timestamp'].dt.hour + .25,
                 df['utc_timestamp'].dt.hour + .5,
                 df['utc_timestamp'].dt.hour + .75]
    
    # given conditions, transform 
    converted_times = np.select(conditions, transform, df['utc_timestamp'].dt.hour)

    return converted_times

def create_impute_groups(data, y):
    '''
    Given dataframe with variable to impute, returns 
    a group number for that NA, how many consecutive
    NAs there are, and the first + last index values of 
    the consecutive NAs
    '''
    # note NA values in dataframe
    df = (data.set_index('utc_timestamp') # data or self.data
            .isna()
            .reset_index(drop=True)
          )
    
    # Assign groups of NAs a unique group number
    group_num = 0
    groups = [0]
    for i in range(len(df[y].values)-1): # y or self.y
        value = df[y].values[i]
        next_value = df[y].values[i+1]

        if value == False and next_value == True:
            group_num += 1
        if next_value == None:
            continue
        if value == False:
            groups.append(0)
        else: 
            groups.append(group_num)

    # Add na groups and index columns to df
    df['missing_group_num'] = groups 
    df['index'] = list(range(0, len(df)))

    missing_lengths = (df.groupby('missing_group_num', as_index=False)
                       .agg(consecutive = ('missing_group_num', 'count'),
                            first_na_idx = ('index', 'min'),
                            # adding 2 to last index for imputation
                            last_na_idx = ('index', lambda x: x.max() + 2)) 
                       .iloc[1:] # remove non_na groups
                       .sort_values('first_na_idx')
                       )
    
    return missing_lengths

def agg_results(impute_results):
    cols = [*impute_results.columns[3:]] # ignoring cols to not agg over

    agg_result = (impute_results.groupby('n', as_index=False)
                        .agg({col: [(f'{col}_mean', 'mean'), (f'{col}_std', 'std')] for col in cols}))

    agg_result.columns = [f'{col[1]}' for col in agg_result.columns] # col[1]
    agg_result = agg_result.rename(columns={'': 'n'}).round(3)

    return agg_result

def plot_imputation_results(agg_result):
    mean_cols = [col for col in agg_result.columns if col.endswith('_mean')]

    fig = px.line(
        data_frame=agg_result,
        x='n',
        y=mean_cols,
        title='Imputation Comparison Results'
    )

    return fig

In [4]:
# Read in data for developing imputation method
cd = Path.cwd()
data_dir = str(cd.parents[1])
load_energy_dt = data_dir + '/datasets/country_energy/country_load.pickle'

# Read pickle data 
with open(load_energy_dt, 'rb') as f:
    country_actuals = (pickle.load(f)[['utc_timestamp', 'country', 'load_actual_entsoe_transparency']])

## Impute method comparison
- Select AT due to their being no NA values for imputation test
- Create window of size n: 2 through 96 (by 2)
    - Randomly sample 24 windows slightly larger than size n from dataset
    - Place n nans in dataset
    - Across the 24 windows, get average mae for a given imputation method
- Display table results of imputation methods across the windows
- Display plots of results comparing methods

### Testing Loop parallelized with Ray

Functions for test, modified for ray

In [None]:
@ray.remote
def create_hour_minute(df):
    '''
    This function is supplemental to the following 'add_time_features' function,
    creating an hour_minute variable
    '''
    # Initialize empty variable
    df['hour_minute'] = None

    # Conditions
    conditions = [df['utc_timestamp'].dt.minute == 15,
                  df['utc_timestamp'].dt.minute == 30,
                  df['utc_timestamp'].dt.minute == 45]
    
    transform = [df['utc_timestamp'].dt.hour + .25,
                 df['utc_timestamp'].dt.hour + .5,
                 df['utc_timestamp'].dt.hour + .75]
    
    # given conditions, transform 
    converted_times = np.select(conditions, transform, df['utc_timestamp'].dt.hour)

    return converted_times

@ray.remote
def create_impute_data(data):
    '''
    Creates main dataset for this experiment, which includes day,
    weekday, and hour_minute. The chosen country, 'AT' (Austria),
    has complete data, making experimentation easy. 
    '''
    impute_data = (data.assign(
            day = lambda x: x['utc_timestamp'].dt.day,
            weekday = lambda x: x['utc_timestamp'].dt.strftime('%a'),
            hour_minute = lambda x: ray.get(create_hour_minute.remote(x))
        )
        .loc[lambda x: x.country == 'AT']
        .reset_index(drop=True)
        ) 

    return impute_data

@ray.remote
def create_window_idx(data, num_windows=24):
    # indices to loop over for get mae across 24 samples
    impute_windows = list(data.sample(num_windows, random_state=24).index)
    return impute_windows

@ray.remote
def create_weekday_impute(data):
    '''
    Creates average over a weekday time period, 
    for imputing load_actal_entsoe_transparency values.
    '''
    # Weekday impute method
    weekday_impute = (data.groupby(['weekday', 'hour_minute'], as_index=False)
                    .agg(avg_weekday_actuals = ('load_actual_entsoe_transparency', lambda x: x.mean().round(2)))
                    )
    week_order = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
    weekday_impute['weekday'] = pd.Categorical(
        weekday_impute['weekday'], 
        categories=week_order, 
        ordered=True
        )
    weekday_impute = weekday_impute.sort_values(by=['weekday','hour_minute'])
    
    return weekday_impute

@ray.remote
def create_impute_window(data, n, idx, forecast=False): 
    '''
    Creates imputation window with n NAs at index idx. 
    Window:
        1 row of available data, 
        n rows of NAs, 
        2 rows of available data
    
    If forecast=True, then there is up to 6months of data before NA values.
    
    This is done to ensure all imputation methods can impute without error
    '''

    window = data.iloc[idx-1:idx+n+2]
    window['to_impute'] = window['load_actual_entsoe_transparency'];  
    window.loc[idx:idx+n-1, 'to_impute'] = np.nan

    # forecast impute window, which has additional data for forcasting
    if forecast:
        ### Future: consider dynamic forecast window
        one_year_data = int((2880 * 12) / 2) # Selecting all data available, up to 6 months
        fcst_start = max(idx - one_year_data, 0) 
        fcst_window = data.iloc[fcst_start:idx+n+2]
        fcst_window['to_impute'] = fcst_window['load_actual_entsoe_transparency'];  
        fcst_window.loc[idx:idx+n-1, 'to_impute'] = np.nan

        return fcst_window
    
    return window

@ray.remote
def create_impute_groups(data, y):
    '''
    Given dataframe with variable to impute, returns 
    a group number for that NA, how many consecutive
    NAs there are, and the first + last index values of 
    the consecutive NAs
    '''
    # note NA values in dataframe
    df = (data.set_index('utc_timestamp') # data or self.data
            .isna()
            .reset_index(drop=True)
          )
    
    # Assign groups of NAs a unique group number
    group_num = 0
    groups = [0]
    for i in range(len(df[y].values)-1): # y or self.y
        value = df[y].values[i]
        next_value = df[y].values[i+1]

        if value == False and next_value == True:
            group_num += 1
        if next_value == None:
            continue
        if value == False:
            groups.append(0)
        else: 
            groups.append(group_num)

    # Add na groups and index columns to df
    df['missing_group_num'] = groups 
    df['index'] = list(range(0, len(df)))

    missing_lengths = (df.groupby('missing_group_num', as_index=False)
                       .agg(consecutive = ('missing_group_num', 'count'),
                            first_na_idx = ('index', 'min'),
                            # adding 2 to last index for imputation
                            last_na_idx = ('index', lambda x: x.max() + 2)) 
                       .iloc[1:] # remove non_na groups
                       .sort_values('first_na_idx')
                       )
    
    return missing_lengths

@ray.remote
def forecast_impute(data, y, model_type='AutoETS'): # AutoETS AAA
    '''
    Given forecasting data and y, creates a forecast model
    with a horizon depending on the number of NAs needing
    imputation.
    Currently, can select 'AutoETS' or 'Prophet' as forecasting models.
    '''
    # from statsforecast import StatsForecast
    # from statsforecast.models import AutoETS
    data = data.reset_index(drop=True)
    impute_info = ray.get(create_impute_groups.remote(data, y))
        
    first_na = impute_info['first_na_idx'].iloc[0] 

    # Defining start, end, and horizon points of training window
    one_year_data = 2880 * 6 # Selecting all data available, up to 6 months
    start = max(first_na - one_year_data, 0)  
    end = first_na - 1 
    
    horizon = impute_info['consecutive'].iloc[0]
    fcst_df = (data.iloc[start:end] 
            .assign(index = 1) 
            [['index', 'utc_timestamp', y]]) # --- This needs to be generalized

    if model_type == 'AutoETS':
        pd.options.mode.chained_assignment = None

        fcst_df = (data.iloc[start:end] 
            .assign(index = 1) 
            [['index', 'utc_timestamp', y]])

        impute_model = StatsForecast(
            models=[AutoETS(model='AAA', season_length=96)], # --- season_length=96 (1 day) or maybe 1 week
            freq='15min', # --- Needs generalizaiton
            n_jobs=-1,
            verbose=False
            ) 

        result = impute_model.forecast(
            df=fcst_df, 
            id_col='index',
            time_col='utc_timestamp', # --- Needs generalization 
            target_col=y, # --- Needs generalization
            h=horizon,
            # show progress = False,
            ).rename(columns={'AutoETS': 'imputed'})
        
        return result['imputed'] # --- Needs generalization
    
    if model_type == 'Prophet':
        fcst_df = (data.iloc[start:end] 
           .rename(columns={'utc_timestamp': 'ds',
                             f"{y}": 'y'})
            [['ds', 'y']])
            
        fcst_df['ds'] = fcst_df['ds'].dt.tz_localize(None)
        
        model = (Prophet(
            yearly_seasonality=True,
            weekly_seasonality=True,
            seasonality_mode='additive',
            )
            .add_seasonality(name='intraday', 
                             period=96/2,  # (4 * 24 / 2) 15 min intervals
                             fourier_order=4)
            .fit(fcst_df[1:])) # ds and y only
        # further improvement if holidays are added

        # create dataframe of length horizon with the ds component
        predict_window = (data.iloc[first_na-1:first_na+horizon-1]
                          .rename(columns={'utc_timestamp': 'ds'})
                          [['ds']])
        predict_window['ds'] = predict_window['ds'].dt.tz_localize(None)        
        
        result = (model.predict(predict_window)
                  .rename(columns={'yhat': 'to_impute'}))
        
        return result['to_impute'] # --- Needs generalization

@ray.remote
def imputation_test(data, n, idx, i):
    '''
    Imputes n values in the data with several imputation methods and 
    stores the results.

    This function pairs with th eimputation comparison function,
    resulting in an automated loop for comparing imputation values
    '''
    
    # create impute windows
    window = ray.get(create_impute_window.remote(data, n, idx))
    fcst_window = ray.get(create_impute_window.remote(data, n, idx, forecast=True))
    weekday_impute = ray.get(create_weekday_impute.remote(data))

    # Weekday average impute method
    window = window.merge(weekday_impute, how='left', on=['weekday', 'hour_minute'])

    # Interpolation impute methods
    window['lin_nearest_impute'] = window['to_impute'].interpolate(method="nearest")
    window['lin_linear_impute'] = window['to_impute'].interpolate(method="linear")
    window['spline_impute'] = window['to_impute'].interpolate(method="spline", order=2)

    # # fcst_impute with load_actual, horizon chosen within, then add rows to the window to calculate the MAE
    fcst_impute = ray.get(forecast_impute.remote(fcst_window, 'to_impute', model_type='AutoETS'))  # Prophet
    prophet_fcst_impute = ray.get(forecast_impute.remote(fcst_window, 'to_impute', model_type='Prophet'))
    del fcst_window
    first_value = window['to_impute'][0]
    last_values = window['to_impute'][-2:].values

    fcst_imputed = [first_value, *fcst_impute.values, *last_values]
    prophet_fcst_imputed = [first_value, *prophet_fcst_impute.values, *last_values]

    window['fcst_imputed'] = fcst_imputed
    window['fcst_impute_smooth'] = pd.Series(fcst_imputed).ewm(alpha=.5).mean()
    window['prophet_fcst_imputed'] = prophet_fcst_imputed
    window['prophet_fcst_imputed_smooth'] = pd.Series(prophet_fcst_imputed).ewm(alpha=.5).mean()

    lin_nearest_mae = mean_absolute_error(
        window['load_actual_entsoe_transparency'], 
        window['lin_nearest_impute']
        )
    lin_linear_mae = mean_absolute_error(
        window['load_actual_entsoe_transparency'], 
        window['lin_linear_impute']
        )
    spline_mae = mean_absolute_error(
        window['load_actual_entsoe_transparency'], 
        window['spline_impute']
        )
    weekday_actuals_mae = mean_absolute_error(
        window['load_actual_entsoe_transparency'], 
        window['avg_weekday_actuals']
        )
    fcst_mae = mean_absolute_error(
        window['load_actual_entsoe_transparency'], 
        window['fcst_imputed']
        )
    prophet_fcst_mae = mean_absolute_error(
        window['load_actual_entsoe_transparency'], 
        window['prophet_fcst_imputed']
        )
    fcst_smooth_mae = mean_absolute_error(
        window['load_actual_entsoe_transparency'], 
        window['fcst_impute_smooth']
        )
    prophet_fcst_smooth_mae = mean_absolute_error(
        window['load_actual_entsoe_transparency'], 
        window['prophet_fcst_imputed_smooth']
        )

    result = pd.DataFrame({ 
        "trial": [i+1],
        "n": [n],
        "lin_nearest_mae": [lin_nearest_mae],
        "lin_linear_mae": [lin_linear_mae],
        "spline_mae": [spline_mae],
        "weekday_actuals_mae": [weekday_actuals_mae],
        "fcst_mae": [fcst_mae],
        "prophet_fcst_mae": [prophet_fcst_mae],
        "fcst_smooth_mae": [fcst_smooth_mae],
        "prophet_fcst_smooth_mae": [prophet_fcst_smooth_mae],
        })

    return result

@ray.remote
def imputation_comparison(data, impute_windows, start=2, end=96+1, increment=2):
    
    final_results = pd.DataFrame({})
    for n in range(start, end, increment): # range(2, 96+1, 2):
        
        # parallelize inner loop
        inner_futures = [imputation_test.remote(data, n, idx, i) for i, idx in enumerate(impute_windows)]
        
        results = ray.get(inner_futures)

        for i in range(len(results)):
            final_results = pd.concat([final_results, results[i]])
        
        # Put and save the results for later
        object_id = ray.put(final_results, _owner=None)
        print(f"Result stored with ID: {object_id}")
    
    return final_results

### Small gap imputation test

In [None]:
%%capture

ray.shutdown()
### --- ray metrics launch-prometheus
ray.init(num_cpus=7, num_gpus=0)
impute_data = create_impute_data.remote(country_actuals)
input_data = ray.get(impute_data)
impute_windows = ray.get(create_window_idx.remote(input_data))
weekday_impute = create_weekday_impute.remote(input_data)

imputation_results = imputation_comparison.remote(
    input_data, 
    impute_windows,
    start=2, 
    end=96+1, 
    increment=4)

In [None]:
# retrieve results
result = ray.get(imputation_results)
# Write results locally as backup
result.to_csv('imputation_results_2.csv')

ray.shutdown()

Plot comparison of imputation methods

In [24]:
cd = Path.cwd()
data_dir = str(cd.parents[0])
filepath = data_dir + '\\load_forecasting_analysis\\imputation_results_2.csv'

impute_results = pd.read_csv(filepath)

agg_result = agg_results(impute_results)


Small Gap Results

In [32]:
fig = plot_imputation_results(agg_result)
fig

fig.write_html("imputation_plot_4_19_2025.html")
# Next steps: 
# Include standard deviation and confidence intervals
# Include seasonal decomposition methods
# Extend imputation window to as much as 5 days, but reduce granularity

In [64]:
agg_result

Unnamed: 0,n,lin_nearest_mae_mean,lin_nearest_mae_std,lin_linear_mae_mean,lin_linear_mae_std,spline_mae_mean,spline_mae_std,weekday_actuals_mae_mean,weekday_actuals_mae_std,fcst_mae_mean,fcst_mae_std,prophet_fcst_mae_mean,prophet_fcst_mae_std,fcst_smooth_mae_mean,fcst_smooth_mae_std,prophet_fcst_smooth_mae_mean,prophet_fcst_smooth_mae_std
0,2,32.433,25.853,17.869,13.929,18.059,15.374,636.211,380.82,229.925,181.807,177.866,134.162,249.403,190.519,188.752,135.144
1,6,95.011,70.636,53.065,29.233,44.783,38.232,610.087,343.085,373.924,292.911,292.63,213.562,390.38,306.904,296.585,209.367
2,10,159.321,94.259,101.027,69.363,75.152,51.339,594.924,329.557,429.378,337.786,343.792,238.589,442.661,355.463,341.358,236.037
3,14,221.066,131.863,160.419,113.779,145.768,104.84,582.617,325.494,451.844,364.575,374.591,252.753,460.903,378.137,365.667,248.27
4,18,288.971,156.199,220.905,149.209,228.159,205.575,571.805,318.247,461.291,381.329,385.122,256.657,467.552,387.144,373.978,250.526
5,22,364.646,158.265,302.456,187.176,258.641,158.118,565.418,304.268,467.432,386.947,392.195,259.093,471.769,385.783,380.436,250.976
6,26,412.216,200.5,349.942,238.135,424.956,427.905,552.982,291.894,466.838,379.363,396.957,259.506,471.825,374.346,389.032,248.826
7,30,476.12,210.3,426.07,244.388,403.114,252.409,544.503,287.481,465.907,371.692,402.714,259.735,471.009,367.149,399.089,249.428
8,34,496.924,202.616,473.59,230.744,478.347,318.448,534.763,285.506,470.139,362.319,411.456,257.416,474.561,359.37,408.911,248.037
9,38,557.615,200.498,531.193,241.171,437.068,230.014,526.111,278.88,474.784,356.276,419.072,250.609,477.815,355.752,417.441,242.979


For this data, the best imputation method depends on the size of NA groups. A NA gap size near 30 missing values shows alternative imputation methods being preferable, such as the constructed forecast and weekday actuals methods.

Importantly, the std of the forecasting and weekday actuals imputation methods are greater around n=30 than linear interpolation.




### Large gap imputation test

In [37]:
%%capture
import ray

ray.shutdown()
### --- ray metrics launch-prometheus
ray.init(num_cpus=7, num_gpus=0)
impute_data = create_impute_data.remote(country_actuals)
input_data = ray.get(impute_data)
impute_windows = ray.get(create_window_idx.remote(input_data))
weekday_impute = create_weekday_impute.remote(input_data)

imputation_results = imputation_comparison.remote(
    input_data, 
    impute_windows,
    start=100,
    end=500, # just over 2 days
    increment=25,
    )

# retrieve results
result = ray.get(imputation_results)
# Write results locally as backup
result.to_csv('imputation_results_large_gaps.csv')
ray.shutdown()

Large Gap Results

In [11]:
cd = Path.cwd()
data_dir = str(cd.parents[0])
filepath = data_dir + '\\load_forecasting_analysis\\imputation_results_large_gaps.csv'

impute_results = pd.read_csv(filepath)

agg_result = agg_results(impute_results)

fig = plot_imputation_results(agg_result)
fig
fig.write_html("imputation_plot_large_gaps_4_19_2025.html")

In [12]:
agg_result

Unnamed: 0,n,lin_nearest_mae_mean,lin_nearest_mae_std,lin_linear_mae_mean,lin_linear_mae_std,spline_mae_mean,spline_mae_std,weekday_actuals_mae_mean,weekday_actuals_mae_std,fcst_mae_mean,fcst_mae_std,prophet_fcst_mae_mean,prophet_fcst_mae_std,fcst_smooth_mae_mean,fcst_smooth_mae_std,prophet_fcst_smooth_mae_mean,prophet_fcst_smooth_mae_std
0,100,1047.186,416.224,1011.865,416.924,1666.875,1457.53,509.304,275.218,567.295,387.851,470.359,272.404,571.307,384.795,475.697,266.977
1,125,1229.793,430.922,969.518,277.503,1609.753,841.829,507.215,264.166,603.604,404.418,483.493,288.265,611.33,399.764,490.266,282.273
2,150,1222.534,334.148,965.103,250.328,2149.148,1140.891,500.226,261.032,627.594,423.143,514.957,331.701,635.216,419.673,521.283,329.878
3,175,1201.679,346.268,1100.027,339.392,2346.709,1886.111,510.653,255.384,663.347,445.01,558.412,374.177,670.385,438.708,563.801,373.506
4,200,1145.509,336.709,1139.752,335.768,3234.706,3923.942,514.136,262.429,693.307,454.222,588.363,397.469,700.453,447.98,594.725,394.402
5,225,1060.658,273.3,1012.428,227.652,3030.476,2595.992,506.967,262.621,723.651,456.649,620.415,417.049,730.671,451.315,627.012,413.606
6,250,1077.238,339.485,1037.528,282.461,3832.24,3591.187,507.164,259.783,747.4,461.443,665.603,453.26,754.234,457.017,671.893,451.734
7,275,1190.944,389.669,1170.262,370.96,4494.761,3639.887,512.377,255.097,773.461,456.844,717.841,487.633,779.993,451.511,723.361,486.348
8,300,1261.717,383.653,1190.223,365.691,3302.974,2062.827,519.262,257.244,797.662,445.016,761.923,514.188,804.525,439.926,767.554,511.479
9,325,1232.111,291.693,1077.876,241.993,4360.52,3843.375,520.998,255.008,829.415,427.691,813.997,543.125,835.574,423.867,820.596,540.22


As a group of NAs exceeds 150, weekday_actuals is a preferably imptation method. Prior to that, the prophet forecast gives the lowest mae

### TimeSeriesImpute 

Given experiment, developed TimeSeriesImpute. Results are compared to linear interpolation.


In [5]:
from forecast_pipeline.TimeSeriesImpute import *

In [None]:
ts_imp = TimeSeriesImpute()
prepped_data = ts_imp.create_impute_data(country_actuals, country='LU')
test = ts_imp.impute_timeseries_gaps(prepped_data, 'load_actual_entsoe_transparency')
test.head()

In [11]:
print('Ensure imputation is occuring correctly:')
print(f"\nDF number NAs: {sum(prepped_data['load_actual_entsoe_transparency'].isna())}, Num NAs after imputation: {sum(test['load_actual_entsoe_transparency'].isna())}")
print(f"\n{prepped_data.shape = }, {test.shape = }")

Ensure imputation is occuring correctly:

DF number NAs: 2664, Num NAs after imputation: 1

prepped_data.shape = (151779, 6), test.shape = (151779, 6)
