In [1]:
import sys
import os
from copy import deepcopy
sys.path.append("..")
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
from webapp.utils.azure_utils import KeyVault, DataLake
import dask.dataframe as dd
import dask.array as da
import river
from data_prep.prep import MeterDataSet

In [2]:
# Connect to Storage Account
vault = KeyVault(keyVaultName = "keyvaultdva2022")
storage_credential = vault.get_secret(secretName = "storagePrimaryKey")
storage = DataLake(account_name = "storageaccountdva", credential = storage_credential)

In [3]:
file_system = "energyhub"
dest_dir = "/data_parq/norm_data"
meters_dir = "/data_parq/meters"
meta_dir = "/data_parq/metadata"
weather_dir = "/data_parq/weather"
bad_building_dir = "/bad_buildings"

In [4]:
meter = "electricity"
metadata_cols = ['building_id', 'site_id','sq_meter', 'primary_space_usage']
weather_cols = ['site_id', 'timestamp', 'air_temperature', 'dew_temperature','wind_direction', 'wind_speed']

## Initiate the Class

In [5]:
electricity = MeterDataSet(meter, metadata_cols, weather_cols)

## Process weather

In [6]:
w_cols = weather_cols[2:]

In [7]:
weather = electricity.fill_weather_na(w_cols, "linear")

In [8]:
len(weather)

331166

In [9]:
weather.columns

Index(['timestamp', 'site_id', 'air_temperature', 'dew_temperature',
       'wind_direction', 'wind_speed'],
      dtype='object')

In [10]:
# weather_df = weather.compute()

In [11]:
# weather_df.isna().sum()*100/len(weather_df)

## Process Meter

In [12]:
meter = electricity.meter

In [13]:
meter.compute().isna().sum()*100/len(meter)

timestamp      0.0
building_id    0.0
electricity    0.0
dtype: float64

## Merge and filter out bad buildings

In [14]:
bad_buildings = storage.pandas_read(
            file_system, directory=bad_building_dir, file_name="bad_buildings.csv")

In [15]:
bad_buildings = bad_buildings.building_id.to_list()

In [16]:
electricity.merge()

In [17]:
electricity.filter_buildings(bad_buildings)

Unnamed: 0_level_0,timestamp,building_id,electricity,site_id,sq_meter,primary_space_usage,air_temperature,dew_temperature,wind_direction,wind_speed
npartitions=19,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
,datetime64[ns],category[unknown],float64,category[known],float64,category[unknown],float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...


In [18]:
df = electricity.df

In [19]:
df.info

<bound method DataFrame.info of Dask DataFrame Structure:
                     timestamp        building_id electricity          site_id sq_meter primary_space_usage air_temperature dew_temperature wind_direction wind_speed
npartitions=19                                                                                                                                                       
                datetime64[ns]  category[unknown]     float64  category[known]  float64   category[unknown]         float64         float64        float64    float64
                           ...                ...         ...              ...      ...                 ...             ...             ...            ...        ...
...                        ...                ...         ...              ...      ...                 ...             ...             ...            ...        ...
                           ...                ...         ...              ...      ...                 ...     

In [45]:
len(df)

25149355

In [20]:
# len(df.building_id.unique())
# 1573 unique buildings before filtering bad buildings

In [21]:
# len(df.building_id.unique())

## Add features

In [22]:
df['usage_lag1'] = df['electricity'].shift(1)
df['usage_lag2'] = df['electricity'].shift(2)
df['usage_lag3'] = df['electricity'].shift(3)

In [30]:
df["hour"] = df.timestamp.dt.hour
df["weekday"] = df.timestamp.dt.weekday
df["month"] = df.timestamp.dt.month
df["year"] = df.timestamp.dt.year 
df["weekday_hour"] = df.weekday.astype(str) + "-" + df.hour.astype(str)

In [32]:
df["ts"] = (df.timestamp - pd.to_datetime("2016-01-01")).dt.total_seconds() // 3600

In [36]:
df["hour_x"] = np.cos(2*np.pi*df.ts/24)
df["hour_y"] = np.sin(2*np.pi*df.ts/24)
    
df["month_x"] = np.cos(2*np.pi*df.ts/(30.4*24))
df["month_y"] = np.sin(2*np.pi*df.ts/(30.4*24))
    
df["weekday_x"] = np.cos(2*np.pi*df.ts/(7*24))
df["weekday_y"] = np.sin(2*np.pi*df.ts/(7*24))

In [23]:
def add_time_features(df):
    """
    Adapted from: https://github.com/buds-lab/ashrae-great-energy-predictor-3-solution-analysis/blob/master/solutions/rank-1/scripts/02_preprocess_data.py
    """
    df.timestamp = pd.to_datetime(df.timestamp)
    # time features
    df["hour"] = df.timestamp.dt.hour
    df["weekday"] = df.timestamp.dt.weekday
    df["month"] = df.timestamp.dt.month
    df["year"] = df.timestamp.dt.year    
    
    # time interactions
    df["weekday_hour"] = df.weekday.astype(str) + "-" + df.hour.astype(str)
    
    # apply cyclic encoding of periodic features
    df["ts"] = (df.timestamp - pd.to_datetime("2016-01-01")).dt.total_seconds() // 3600
    df["hour_x"] = np.cos(2*np.pi*df.ts/24)
    df["hour_y"] = np.sin(2*np.pi*df.ts/24)
    
    df["month_x"] = np.cos(2*np.pi*df.ts/(30.4*24))
    df["month_y"] = np.sin(2*np.pi*df.ts/(30.4*24))
    
    df["weekday_x"] = np.cos(2*np.pi*df.ts/(7*24))
    df["weekday_y"] = np.sin(2*np.pi*df.ts/(7*24))

In [24]:
# df_pd = df.compute()

In [38]:
# add_time_features(df)

## Store the data in Azure for use later

In [50]:
file_system = "energyhub"
directory = "data_parq/norm"

In [46]:
buildings = df.building_id.unique().compute()

In [48]:
buildings[:3]

0       Panther_parking_Alaina
1    Panther_office_Clementine
2        Panther_retail_Lester
Name: building_id, dtype: category
Categories (1636, object): ['Bear_assembly_Angel', 'Bear_assembly_Beatrice', 'Bear_assembly_Danial', 'Bear_assembly_Diana', ..., 'Peacock_lodging_Erica', 'Peacock_office_Ronda', 'Swan_unknown_Darrin', 'Swan_unknown_Yoshiko']

In [52]:
for building in buildings:
    building_df = df.loc[df.building_id == building, :]
    file_name = "norm_" + building + ".parq"
    building_df.to_parquet(path = file_name, engine = "pyarrow", compression = "gzip", write_index = False)
    storage.upload(file_system, directory = directory, file_name = file_name, file_path = file_name, overwrite=True)
    os.remove(file_name)

RuntimeError: Schemas are inconsistent, try using `to_parquet(..., schema="infer")`, or pass an explicit pyarrow schema. Such as `to_parquet(..., schema={"column1": pa.string()})`