This notebook outlines how the training data is obtained. It requires the python modules located in this folder.

### Weather Data

There are several points where intermediate data frames are written to disk in order to provide incremental checkpoints, since this dataset takes a while to download and build.

In [1]:
from weather_data import get_station_df
from os.path import isfile
from datetime import timedelta, timezone, datetime
import multiprocessing
from joblib import Parallel, delayed
import pandas as pd
import numpy as np

num_cores = multiprocessing.cpu_count()
parallel = Parallel(n_jobs=num_cores)

stations_dict = {'IA' : ['DSM', 'CID'],
                 'MN' : ['DLH', 'JKJ', 'LYV', 'MSP', 'RST'], 
                 'WI' : ['MSN', 'MKE', 'EAU', 'GRB'],
                 'MI' : ['ANJ', 'GRR', 'LAN', 'DET', 'ARB'],
                 'IN' : ['EVV', 'FWA', 'GYY', 'IND', 'SBN', 'SPI'],
                 'IL' : ['BMI', 'CMI', 'ARR', 'PIA'],
                 'MO' : ['STL', 'COU', 'SGF', 'MKC'],
                 'MS' : ['HKS', 'MJD', 'TUP', 'MEI'],
                 'LA' : ['BTR', 'LFT', 'LCH', 'SHV', 'AEX'],
                 'TX' : ['LFK'] }

stations = [(state, station) for state in stations_dict.keys() for station in stations_dict[state]]

def download_file_path(state, station):
    return f'../data/{state}_{station}.parquet'

def est(yyyy, mm, dd, hh):
    return datetime(yyyy, mm, dd, hh, tzinfo=timezone(timedelta(hours=-5)))

In [3]:

first_hour = est(2015,  2, 1,  0)
last_hour  = est(2021, 12, 31, 23)
observation_dates = pd.date_range(start = first_hour, end = last_hour)
observation_hours = [d.replace(hour = h) for d in observation_dates for h in range(0, 24)]

def normalize_station(station):
    station['valid'] = pd.to_datetime(station['valid'], utc = True)
    station = station[station['tmpf'] != 'M']
    numeric_cols = ['tmpf', 'lat', 'lon']
    station[numeric_cols] = station[numeric_cols].apply(pd.to_numeric, axis=1)

    station['feel'] = np.where(station['feel'] == 'M', station['tmpf'], station['feel'])
    station['feel'] = station[['feel']].apply(pd.to_numeric, axis=1)
    return station

def download_station(state, station):
    path = download_file_path(state, station) 
    if isfile(path):
        return pd.read_parquet(path)

    station = get_station_df(station, first_hour, last_hour)
    if station is None:
        print(f'Retrieve {station} failed')
        return None
    return station.to_parquet(path)

_ = parallel(delayed(download_station)(state, station) for (state, station) in stations)

In [5]:
def build_hourly_df(state, station):
    path = f'../data/weather/big_cities/slim/{station}.parquet'
    if isfile(path):
        return pd.read_parquet(path)

    w = pd.read_parquet(download_file_path(state, station))

    w = w.drop_duplicates('valid').set_index('valid')

    print(f'Read {station}')
    # Many stations provide more than one observation per hour, so subsample
    w = w.head(10)
    obs = observation_hours[1:100]
    w = w.iloc[[w.index.get_loc(h, method='nearest') for h in obs]]

    print(f'Hourly subsampled {station}')
    # Each station's data will be merged into a single dataframe, so rename columns
    w = w[['tmpf', 'feel']]
    w = w.rename(columns={'tmpf':f"{station}_tmpf", 'feel':f"{station}_feel"})
    #w = normalize_station(w)
    return w

#dfs = parallel(delayed(build_hourly_df)(state, station) for (state, station) in stations)
dfs = []
for (state, station) in stations:
    dfs.append(build_hourly_df(state, station))

merged = dfs[0]
for w in dfs[1:]:
    merged = merged.merge(w, left_index=True, right_index=True, how='inner')

Read DSM
Hourly subsampled DSM


KeyError: "['valid'] not in index"

### Obtain the Regional MTLF and Actual Load for each Observation Hour

In [None]:
from rf_al_data import get_daily_rf_al_df
forecast_output_dir = '../data/mtlf'
rf_al_df = get_daily_rf_al_df(first_hour, last_hour, forecast_output_dir)

rf_al_df.to_parquet(f'{forecast_output_dir}/rf_al_all.parquet')

Fetching hourly Regional Forecasts and Actual Load for 2524 days


### Validate Data 

In [None]:
(num_weather_observations, _) = merged.shape
(num_load_observations, _) = rf_al_df.shape
assert num_weather_observations == num_load_observations

AssertionError: 

In [None]:
num_load_observations

60576

In [None]:
num_weather_observations

58374