# Pre Process Load Forecasting Data

Load forecasting data is expecting data the following schema:

```json
{
  "MeterNumber": "string",
  "ReadingTimestamp": "datetime",
  "KWHConsumption": "double",
  "Latitude", "double",
  "Longitude", "double"
  "Grouping", "string"
}
```

This script processes the data into specific meter files for training (many model training), as well as one specific model file. 
It: 
1. Downloads historic weather for a given timestamp and location
2. Creates a training dataset for each meter. 

Training data is created with the following schema:
```
MeterNumber:string
ReadingTimestamp:timestamp
TemperatureC:float
DewPointC:float
RelativeHumidity:float
PrecipitationAmountmm:float
WindDirectionDegrees:float
WindSpeedKmh:float
VisibilityKm:float
StationPressurekPa:float
Humidex:float
WindChillC:float
Weather:string
```

In [None]:
#Imports
from pyspark.sql.types import *
from pyspark.sql.functions import col, lag, min, max,avg, sum, count, udf, year, month, dayofmonth, hour, row_number, abs, unix_timestamp, to_date, date_trunc, first, dayofweek
from pyspark.sql.window import Window

from dateutil.relativedelta import relativedelta
import datetime
import requests
import os
import sys


In [None]:
# mount the data lake
### 
###  It is up to you on how to mount the data lake container - however, it must be mounted to /mnt/lf for this example
###

Mounted load-forecasting successfully
Out[5]: 'wasbs://load-forecasting@stgmadlssharedcc.blob.core.windows.net'



First thing we do is to separate the meter readings into individual files.
This could be used for a variety of purposes.

In [None]:
meter_readings = spark.read \
  .parquet('/mnt/lf/input/readings.parquet').distinct()

#demo: limit to n meters
#demo_meters = meter_readings.select(col("MeterNumber")).distinct().limit(1000).rdd.map(lambda x : x[0]).collect()
#print(demo_meters)

#incase the source had any nulls
meter_reading = meter_readings.where(col("MeterNumber").isNotNull())
#optionally limit the meters
#meter_readings = meter_readings.filter(col("ReadingTimeStamp") < '2022-08-20').filter(meter_readings.MeterNumber.isin(demo_meters))

display(meter_readings.orderBy("meterNumber","ReadingTimestamp"))

ReadingTimestamp,KWHConsumption,Latitude,Longitude,MeterNumber,Grouping
2022-10-21T01:00:00.000+0000,0.0,51,-114,1000227,32
2022-10-21T01:15:00.000+0000,0.0,51,-114,1000227,32
2022-10-21T01:30:00.000+0000,0.0,51,-114,1000227,32
2022-10-21T01:45:00.000+0000,0.0,51,-114,1000227,32
2022-10-21T02:00:00.000+0000,0.0,51,-114,1000227,32
2022-10-21T02:15:00.000+0000,0.0,51,-114,1000227,32
2022-10-21T02:30:00.000+0000,0.0,51,-114,1000227,32
2022-10-21T02:45:00.000+0000,0.0,51,-114,1000227,32
2022-10-21T03:00:00.000+0000,0.0,51,-114,1000227,32
2022-10-21T03:15:00.000+0000,0.0,51,-114,1000227,32


## Get distinct list of meter locations

We need to get a distinct list of meter locations, and we'll use this to download data from weather providers

In [None]:
distinct_meter_locations = meter_readings \
    .select("MeterNumber", "Latitude", "Longitude", "ReadingTimestamp") \
    .groupBy("MeterNumber", "Latitude", "Longitude") \
    .agg(min("ReadingTimestamp").alias("MinTime"), max("ReadingTimestamp").alias("MaxTime"))
             
print("Found "+str(distinct_meter_locations.count())+" distinct meters")
display(distinct_meter_locations)

Found 6855 distinct meters


MeterNumber,Latitude,Longitude,MinTime,MaxTime
M978274,51,-114,2022-01-01T00:15:00.000+0000,2022-09-20T06:15:00.000+0000
M944548,51,-114,2022-01-01T00:15:00.000+0000,2022-07-07T19:30:00.000+0000
M977467,51,-114,2022-01-01T00:15:00.000+0000,2022-08-18T14:30:00.000+0000
M930514,51,-114,2022-01-01T00:15:00.000+0000,2022-09-20T03:15:00.000+0000
M994073,51,-114,2022-01-01T00:15:00.000+0000,2022-08-18T06:15:00.000+0000
M985087,51,-114,2022-01-01T00:15:00.000+0000,2022-08-18T05:00:00.000+0000
M978556,51,-114,2022-01-01T00:15:00.000+0000,2022-08-18T04:45:00.000+0000
M932666,51,-114,2022-01-01T00:15:00.000+0000,2022-08-18T04:45:00.000+0000
M995449,51,-114,2022-01-01T00:15:00.000+0000,2022-08-18T06:00:00.000+0000
M984624,51,-114,2022-01-01T00:15:00.000+0000,2022-08-18T05:00:00.000+0000


## Download Environment Canada Weather

This will download historical weather for a given date range.

In [None]:
ec_station_list_download_uri = 'PUT IN EXTERNAL FILE SOMWHERE TODO'
ec_station_list_save_path = '/mnt/lf/training/historical_weather/Station Inventory EN.csv'

if not os.path.isdir('/dbfs/mnt/lf/training/historical_weather'):
  os.mkdir('/dbfs/mnt/lf/training/historical_weather')

if not os.path.exists('/dbfs/'+ec_station_list_save_path):
    ec_station_list_response = requests.get(ec_station_list_download_uri)
    if ec_station_list_response.status_code == 200:
        print('Downloaded station list successfully!')

        with open('/dbfs/'+ec_station_list_save_path, 'w') as save_file:
          save_file.write(ec_station_list_response.text)
#IATA_ID,Name,WMO_ID,MSC_ID,Latitude,Longitude,Elevation(m),Data_Provider,Dataset/Network,AUTO/MAN,Province/Territory
ec_stations_schema = StructType([
  StructField('Station Name', StringType(), True),
  StructField('Province', StringType(), True),
  StructField('Climate ID', StringType(), True),
  StructField('Station ID', StringType(), True),
  StructField('WMO ID', StringType(), True),
  StructField('TC ID', StringType(), True),
  StructField('LatitudeDecimalDegrees', FloatType(), True),
  StructField('LongitudeDecimalDegrees', FloatType(), True),
  StructField('LatitudeString', StringType(), True),
  StructField('LongitudeString', StringType(), True),
  StructField('ElevationMeters', FloatType(), True),
  StructField('FirstYear', IntegerType(), True),
  StructField('LastYear', IntegerType(), True),
  StructField('HLYFirstyear', IntegerType(), True),
  StructField('HLYLastYear', IntegerType(), True),
  StructField('DLYFirstyear', IntegerType(), True),
  StructField('DLYLastYear', IntegerType(), True),
  StructField('MLYFirstyear', IntegerType(), True),
  StructField('MLYLastYear', IntegerType(), True),
])
ec_stations_df = spark.read.option("header",True).schema(ec_stations_schema).csv(ec_station_list_save_path)

display(ec_stations_df)

Station Name,Province,Climate ID,Station ID,WMO ID,TC ID,LatitudeDecimalDegrees,LongitudeDecimalDegrees,LatitudeString,LongitudeString,ElevationMeters,FirstYear,LastYear,HLYFirstyear,HLYLastYear,DLYFirstyear,DLYLastYear,MLYFirstyear,MLYLastYear
ACTIVE PASS,BRITISH COLUMBIA,1010066,14,,,48.87,-123.28,485200000,-1231700000,4.0,1984,1996,,,1984.0,1996.0,1984.0,1996.0
ALBERT HEAD,BRITISH COLUMBIA,1010235,15,,,48.4,-123.48,482400000,-1232900000,17.0,1971,1995,,,1971.0,1995.0,1971.0,1995.0
BAMBERTON OCEAN CEMENT,BRITISH COLUMBIA,1010595,16,,,48.58,-123.52,483500000,-1233100000,85.3,1961,1980,,,1961.0,1980.0,1961.0,1980.0
BEAR CREEK,BRITISH COLUMBIA,1010720,17,,,48.5,-124.0,483000000,-1240000000,350.5,1910,1971,,,1910.0,1971.0,1910.0,1971.0
BEAVER LAKE,BRITISH COLUMBIA,1010774,18,,,48.5,-123.35,483000000,-1232100000,61.0,1894,1952,,,1894.0,1952.0,1894.0,1952.0
BECHER BAY,BRITISH COLUMBIA,1010780,19,,,48.33,-123.63,482000000,-1233800000,12.2,1956,1966,,,1956.0,1966.0,1956.0,1966.0
BRENTWOOD BAY 2,BRITISH COLUMBIA,1010960,20,,,48.6,-123.47,483600000,-1232800000,38.0,1987,1997,,,1987.0,1997.0,1987.0,1997.0
BRENTWOOD CLARKE ROAD,BRITISH COLUMBIA,1010961,21,,,48.57,-123.45,483400000,-1232700000,30.5,1972,1980,,,1972.0,1980.0,1972.0,1980.0
BRENTWOOD W SAANICH RD,BRITISH COLUMBIA,1010965,22,,,48.57,-123.43,483400000,-1232600000,91.4,1960,1970,,,1960.0,1970.0,1960.0,1970.0
CENTRAL SAANICH VEYANESS,BRITISH COLUMBIA,1011467,25,,,48.58,-123.42,483500000,-1232500000,53.3,1963,1994,,,1963.0,1994.0,1963.0,1994.0


In [None]:
def download_historical_weather(area_ec_station_id):
    save_dir = '/dbfs/mnt/lf/training/historical_weather/'+area_ec_station_id
    if not os.path.isdir(save_dir):
      os.mkdir(save_dir)

    curr_date = datetime.datetime(2020, 1, 1)
    end_date = datetime.datetime.now()
    
    while curr_date < end_date:
        curr_date = curr_date + relativedelta(months=1) #data is by the month from the service
        save_path = '/dbfs/mnt/lf/training/historical_weather/'+area_ec_station_id+'/'+curr_date.strftime("%Y%m%d")+'.csv'

        if os.path.isfile(save_path):
            continue #already downloaded

        #Timespan 2 == daily, 1 == hourly, i think.
        url = 'https://climate.weather.gc.ca/climate_data/bulk_data_e.html?format=csv&stationID='+area_ec_station_id+'&Year='+str(curr_date.year)+'&Month='+str(curr_date.month)+'&Day='+str(curr_date.day)+'&timeframe=1&submit= Download+Data'
        print("Downloading '"+url+"'")
        ec_response = requests.get(url)
        if ec_response.status_code == 200:
            print('Downloaded successfully!')

            with open(save_path, 'w') as save_file:
              save_file.write(ec_response.text)

        else:
            continue

### Figure out which station is closest and active during the entirety of the AMI readings

In [None]:
from math import sin, cos, sqrt, atan2, radians
from pyspark.sql.types import FloatType

#define the UDF for calculating the distance between two points. 
def calculate_distance(lat0d, lon0d, lat1d, lon1d):

    # approximate radius of earth in m
    R = 6373.0 * 1000.0

    lat0 = radians(lat0d)
    lon0 = radians(lon0d)
    lat1 = radians(lat1d)
    lon1 = radians(lon1d)

    dlon = lon1 - lon0
    dlat = lat1 - lat0

    a = sin(dlat / 2)**2 + cos(lat0) * cos(lat1) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))

    distance = R * c
    
    return distance

calcDistanceUDF = udf(lambda lat0,lon0,lat1,lon1 : calculate_distance(lat0,lon0,lat1,lon1),FloatType())

In [None]:
#figure out which station is closest to the meter we're looking at
meter_distances_to_stations_df = distinct_meter_locations.crossJoin(ec_stations_df).withColumn("StationDistance", calcDistanceUDF(col('Latitude'), col('Longitude'),col('LatitudeDecimalDegrees'), col('LongitudeDecimalDegrees')))

#now find the closest station that's *within* the date range of the meter reading
meter_stations_df = meter_distances_to_stations_df \
    .withColumn("MinYear", year("MinTime")) \
    .withColumn("MaxYear", year("MaxTime")) \
    .filter('MinYear >= HLYFirstyear').filter('MaxYear >= HLYFirstyear')

#now find the closest station to out meter. 
windowSpec = Window.partitionBy("MeterNumber").orderBy("StationDistance")
meter_stations_df = meter_stations_df.withColumn("row_number", row_number().over(windowSpec)).filter(col("row_number") == 1)

meter_stations_df = meter_stations_df.select("MeterNumber", "Latitude","Longitude","MinTime","MaxTime", "Station ID", "Station Name","StationDistance")
meter_stations_df.cache()

display(meter_stations_df)
print(meter_stations_df.count())

MeterNumber,Latitude,Longitude,MinTime,MaxTime,Station ID,Station Name,StationDistance
1011172,51,-114,2022-10-25T07:15:00.000+0000,2022-10-28T23:45:00.000+0000,27211,CALGARY INT'L CS,12235.35
1056927,51,-114,2022-09-23T07:30:00.000+0000,2022-10-28T23:45:00.000+0000,27211,CALGARY INT'L CS,12235.35
875598,51,-114,2022-09-15T07:15:00.000+0000,2022-10-28T23:45:00.000+0000,27211,CALGARY INT'L CS,12235.35
930508,51,-114,2022-10-20T21:30:00.000+0000,2022-10-28T23:45:00.000+0000,27211,CALGARY INT'L CS,12235.35
930517,51,-114,2022-08-18T06:15:00.000+0000,2022-10-28T23:45:00.000+0000,27211,CALGARY INT'L CS,12235.35
950245,51,-114,2022-10-21T00:30:00.000+0000,2022-10-28T23:45:00.000+0000,27211,CALGARY INT'L CS,12235.35
978762,51,-114,2022-10-21T01:00:00.000+0000,2022-10-28T23:45:00.000+0000,27211,CALGARY INT'L CS,12235.35
981486,51,-114,2022-01-01T00:15:00.000+0000,2022-08-17T23:15:00.000+0000,27211,CALGARY INT'L CS,12235.35
983656,51,-114,2022-10-21T01:00:00.000+0000,2022-10-28T23:45:00.000+0000,27211,CALGARY INT'L CS,12235.35
983986,51,-114,2022-10-21T00:45:00.000+0000,2022-10-28T23:45:00.000+0000,27211,CALGARY INT'L CS,12235.35


6855


### Download historical weather for each single station ID that we have.

In [None]:
station_ids_to_download = meter_stations_df.select("Station Id").distinct().collect()
for station in station_ids_to_download:
    print('downloading historical weather for ' +station['Station Id'])
    download_historical_weather(station['Station Id'])

downloading historical weather for 27211


### Generate meter readings with temperatures

In [None]:
ec_weather_schema = StructType([
    StructField('StationLatitude', FloatType(), True),
    StructField('StationLongitude', FloatType(), True),
    StructField('StationName', StringType(), True),
    StructField('ClimateID', StringType(), True),
    StructField('StationDateTimeLocal', TimestampType(), True),
    StructField('Year', IntegerType(), True),
    StructField('Month', IntegerType(), True),
    StructField('Day', IntegerType(), True),
    StructField('TimeLocal', StringType(), True),
    StructField('TemperatureC', FloatType(), True),
    StructField('TemperatureCFlag', StringType(), True),
    StructField('DewPointC', FloatType(), True),
    StructField('DewPointCFlag', StringType(), True),
    StructField('RelativeHumidity', FloatType(), True),
    StructField('RelativeHumidityFlag', StringType(), True),
    StructField('PrecipitationAmountmm', FloatType(), True),
    StructField('PrecipitationAmountmmFlag', StringType(), True),
    StructField('WindDirectionDegrees', FloatType(), True),
    StructField('WindDirectionDegreesFlag', StringType(), True),
    StructField('WindSpeedKmh', FloatType(), True),
    StructField('WindSpeedKmhFlag', StringType(), True),
    StructField('VisibilityKm', FloatType(), True),
    StructField('VisibilityKmFlag', StringType(), True),
    StructField('StationPressurekPa', FloatType(), True),
    StructField('StationPressurekPaFlag', StringType(), True),
    StructField('Humidex', FloatType(), True),
    StructField('HumidexFlag', StringType(), True),
    StructField('WindChillC', FloatType(), True),
    StructField('WindChillCFlag', StringType(), True),
    StructField('Weather', StringType(), True)
])


In [None]:
#read in all the station hourly weather data
station_hourly_weather_df = spark.read.option("header",True).schema(ec_weather_schema).csv('mnt/lf/training/historical_weather/*/*.csv') \
    .drop('TemperatureCFlag','DewPointCFlag','RelativeHumidityFlag','PrecipitationAmountmmFlag','WindDirectionDegreesFlag','WindSpeedKmhFlag','VisibilityKmFlag','StationPressurekPaFlag','HumidexFlag','WindChillCFlag')

stationids_df = meter_stations_df.select("Station Id", "Station Name")
station_hourly_weather_df = station_hourly_weather_df.join(stationids_df, stationids_df['Station Name'] == station_hourly_weather_df['StationName'],'inner').drop(stationids_df['Station Name']) \
    .withColumn("Hour", hour("StationDateTimeLocal"))

station_hourly_weather_df = station_hourly_weather_df.distinct()
station_hourly_weather_df.cache()

#add the StationID to the meter_readings
meter_station_mappings_df = meter_stations_df.select("MeterNumber","Station Id", "Station Name")
meter_readings_with_station_df = meter_readings.join(meter_station_mappings_df, meter_readings.MeterNumber == meter_station_mappings_df.MeterNumber, 
                                               "inner") \
    .drop(meter_station_mappings_df.MeterNumber) \
    .withColumnRenamed("Station Id", "StationId") \
    .withColumnRenamed("Station Name", "StationName")

#add in the year/month/day/hour to the meter_readings_with_station_df.  We need this to join things on these values for weather
meter_readings_with_station_df = meter_readings_with_station_df \
    .withColumn("ReadingYear", year("ReadingTimestamp")) \
    .withColumn("ReadingMonth", month("ReadingTimestamp")) \
    .withColumn("ReadingDay", dayofmonth("ReadingTimestamp")) \
    .withColumn("ReadingHour", hour("ReadingTimestamp"))\
    .withColumn("ReadingDayOfWeek", dayofweek("ReadingTimestamp"))

#now join the meter_readings with stations with the weather. 
meter_readings_with_weather_df = meter_readings_with_station_df.join(station_hourly_weather_df, (meter_readings_with_station_df.ReadingYear == station_hourly_weather_df.Year) & (meter_readings_with_station_df.ReadingMonth == station_hourly_weather_df.Month) & (meter_readings_with_station_df.ReadingDay == station_hourly_weather_df.Day) & (meter_readings_with_station_df.ReadingHour == station_hourly_weather_df.Hour),"inner") \
    .drop(station_hourly_weather_df.StationName)\
    .withColumn('ReadingDiffSeconds',abs(unix_timestamp("ReadingTimestamp") - unix_timestamp('StationDateTimeLocal'))) #add Time Diff




In [None]:
display(meter_readings_with_weather_df)

ReadingTimestamp,KWHConsumption,Latitude,Longitude,Grouping,MeterNumber,StationId,StationName,ReadingYear,ReadingMonth,ReadingDay,ReadingHour,ReadingDayOfWeek,StationLatitude,StationLongitude,ClimateID,StationDateTimeLocal,Year,Month,Day,TimeLocal,TemperatureC,DewPointC,RelativeHumidity,PrecipitationAmountmm,WindDirectionDegrees,WindSpeedKmh,VisibilityKm,StationPressurekPa,Humidex,WindChillC,Weather,Station Id,Hour,ReadingDiffSeconds
2022-04-12T20:15:00.000+0000,0.23,51,-114,11,M901917,27211,CALGARY INT'L CS,2022,4,12,20,3,-114.0,51.11,3031094,2022-04-12T20:00:00.000+0000,2022,4,12,20:00,-9.8,-14.5,68.0,0.0,2.0,20.0,,89.9,,-18.0,,27211,20,900
2022-04-12T20:45:00.000+0000,0.06,51,-114,76,M889096,27211,CALGARY INT'L CS,2022,4,12,20,3,-114.0,51.11,3031094,2022-04-12T20:00:00.000+0000,2022,4,12,20:00,-9.8,-14.5,68.0,0.0,2.0,20.0,,89.9,,-18.0,,27211,20,2700
2022-04-12T20:30:00.000+0000,0.14,51,-114,7,M932319,27211,CALGARY INT'L CS,2022,4,12,20,3,-114.0,51.11,3031094,2022-04-12T20:00:00.000+0000,2022,4,12,20:00,-9.8,-14.5,68.0,0.0,2.0,20.0,,89.9,,-18.0,,27211,20,1800
2022-04-12T20:45:00.000+0000,0.19,51,-114,9,M884295,27211,CALGARY INT'L CS,2022,4,12,20,3,-114.0,51.11,3031094,2022-04-12T20:00:00.000+0000,2022,4,12,20:00,-9.8,-14.5,68.0,0.0,2.0,20.0,,89.9,,-18.0,,27211,20,2700
2022-04-12T20:00:00.000+0000,0.09,51,-114,98,M979045,27211,CALGARY INT'L CS,2022,4,12,20,3,-114.0,51.11,3031094,2022-04-12T20:00:00.000+0000,2022,4,12,20:00,-9.8,-14.5,68.0,0.0,2.0,20.0,,89.9,,-18.0,,27211,20,0
2022-04-12T20:30:00.000+0000,0.19,51,-114,18,M877398,27211,CALGARY INT'L CS,2022,4,12,20,3,-114.0,51.11,3031094,2022-04-12T20:00:00.000+0000,2022,4,12,20:00,-9.8,-14.5,68.0,0.0,2.0,20.0,,89.9,,-18.0,,27211,20,1800
2022-04-12T20:00:00.000+0000,0.29,51,-114,36,M916763,27211,CALGARY INT'L CS,2022,4,12,20,3,-114.0,51.11,3031094,2022-04-12T20:00:00.000+0000,2022,4,12,20:00,-9.8,-14.5,68.0,0.0,2.0,20.0,,89.9,,-18.0,,27211,20,0
2022-04-12T20:15:00.000+0000,0.23,51,-114,49,M1007124,27211,CALGARY INT'L CS,2022,4,12,20,3,-114.0,51.11,3031094,2022-04-12T20:00:00.000+0000,2022,4,12,20:00,-9.8,-14.5,68.0,0.0,2.0,20.0,,89.9,,-18.0,,27211,20,900
2022-04-12T20:45:00.000+0000,0.01,51,-114,16,M993145,27211,CALGARY INT'L CS,2022,4,12,20,3,-114.0,51.11,3031094,2022-04-12T20:00:00.000+0000,2022,4,12,20:00,-9.8,-14.5,68.0,0.0,2.0,20.0,,89.9,,-18.0,,27211,20,2700
2022-04-12T20:15:00.000+0000,0.06,51,-114,3,M875905,27211,CALGARY INT'L CS,2022,4,12,20,3,-114.0,51.11,3031094,2022-04-12T20:00:00.000+0000,2022,4,12,20:00,-9.8,-14.5,68.0,0.0,2.0,20.0,,89.9,,-18.0,,27211,20,900


## Fill missing times with just the previous value.

In [None]:
timeFillWindow = Window.partitionBy("MeterNumber").orderBy("ReadingTimestamp").rowsBetween(-sys.maxsize, 0)
#Todo

In [None]:
#build the training data set for ALL meters, grouped by timestamp

all_consumption_data_df = meter_readings_with_weather_df \
    .withColumn("ForecastTimestamp", date_trunc("hour", meter_readings_with_weather_df.ReadingTimestamp))\
    .select("ForecastTimestamp",\
            "KWHConsumption",\
            "TemperatureC",\
            "DewPointC",\
            "RelativeHumidity",\
            "PrecipitationAmountmm",\
            "WindDirectionDegrees",\
            "WindSpeedKmh",\
            "VisibilityKm",\
            "StationPressurekPa",\
            "Humidex",\
            "WindChillC",\
            "Weather") \
    .groupBy("ForecastTimestamp")\
    .agg(avg("TemperatureC").alias("TemperatureC"),\
        avg("DewPointC").alias("DewPointC"),\
         avg("RelativeHumidity").alias("RelativeHumidity"),\
         avg("PrecipitationAmountmm").alias("PrecipitationAmountmm"),\
         avg("WindDirectionDegrees").alias("WindDirectionDegrees"),\
         avg("WindSpeedKmh").alias("WindSpeedKmh"),\
         avg("VisibilityKm").alias("VisibilityKm"),\
         avg("StationPressurekPa").alias("StationPressurekPa"),\
         avg("Humidex").alias("Humidex"),\
         avg("WindChillC").alias("WindChillC"),\
         max("Weather").alias("Weather"),\
         sum("KWHConsumption").alias("KWHConsumption"))\
    .orderBy("ForecastTimeStamp")

In [None]:
#all_consumption_data_df.write.parquet('/mnt/lf/training/combined_consumptiondata_withweather_all.parquet')

In [None]:
#build the training dataframe, grouped by meter
meter_trainingdata_df = meter_readings_with_weather_df \
    .withColumn("ForecastTimestamp", date_trunc("hour", meter_readings_with_weather_df.ReadingTimestamp))\
    .select("MeterNumber", \
            "ForecastTimestamp",\
            "KWHConsumption",\
            "TemperatureC",\
            "DewPointC",\
            "RelativeHumidity",\
            "PrecipitationAmountmm",\
            "WindDirectionDegrees",\
            "WindSpeedKmh",\
            "VisibilityKm",\
            "StationPressurekPa",\
            "Humidex",\
            "WindChillC",\
            "Weather") \
    .groupBy("MeterNumber","ForecastTimestamp")\
    .agg(avg("TemperatureC").alias("TemperatureC"),\
        avg("DewPointC").alias("DewPointC"),\
         avg("RelativeHumidity").alias("RelativeHumidity"),\
         avg("PrecipitationAmountmm").alias("PrecipitationAmountmm"),\
         avg("WindDirectionDegrees").alias("WindDirectionDegrees"),\
         avg("WindSpeedKmh").alias("WindSpeedKmh"),\
         avg("VisibilityKm").alias("VisibilityKm"),\
         avg("StationPressurekPa").alias("StationPressurekPa"),\
         avg("Humidex").alias("Humidex"),\
         avg("WindChillC").alias("WindChillC"),\
         max("Weather").alias("Weather"),\
         sum("KWHConsumption").alias("KWHConsumption"))\
    .orderBy("MeterNumber","ForecastTimestamp")
                                          
#display(meter_trainingdata_df)



In [None]:
meter_trainingdata_df.write.partitionBy("MeterNumber").parquet('/mnt/lf/training/consumptiondata_withweather_by_meter.parquet')

In [None]:
#build out 'grouping' training data set.  These are consumption BY GROUP, rather than by meter
group_training_df = meter_readings_with_weather_df \
    .withColumn("ForecastTimestamp", date_trunc("hour", meter_readings_with_weather_df.ReadingTimestamp))\
    .select("Grouping", \
            "ForecastTimestamp",\
            "KWHConsumption",\
            "TemperatureC",\
            "DewPointC",\
            "RelativeHumidity",\
            "PrecipitationAmountmm",\
            "WindDirectionDegrees",\
            "WindSpeedKmh",\
            "VisibilityKm",\
            "StationPressurekPa",\
            "Humidex",\
            "WindChillC",\
            "Weather") \
    .groupBy("Grouping","ForecastTimestamp")\
    .agg(avg("TemperatureC").alias("TemperatureC"),\
        avg("DewPointC").alias("DewPointC"),\
         avg("RelativeHumidity").alias("RelativeHumidity"),\
         avg("PrecipitationAmountmm").alias("PrecipitationAmountmm"),\
         avg("WindDirectionDegrees").alias("WindDirectionDegrees"),\
         avg("WindSpeedKmh").alias("WindSpeedKmh"),\
         avg("VisibilityKm").alias("VisibilityKm"),\
         avg("StationPressurekPa").alias("StationPressurekPa"),\
         avg("Humidex").alias("Humidex"),\
         avg("WindChillC").alias("WindChillC"),\
         max("Weather").alias("Weather"),\
         sum("KWHConsumption").alias("KWHConsumption"))\
    .orderBy("Grouping","ForecastTimestamp")

group_training_df.write.partitionBy("Grouping").parquet('/mnt/lf/training/consumptiondata_withweather_by_group.parquet')