In [0]:
import re

In [0]:
dbutils.widgets.removeAll()

dbutils.widgets.text("schema", "fa2023_group06_enefit_train")
dbutils.widgets.dropdown("dataset", "train", ["train", "test"])

In [0]:
databaseName = dbutils.widgets.get("schema")
dataset = dbutils.widgets.get("dataset")

In [0]:
userName = spark.sql("SELECT CURRENT_USER").collect()[0]['current_user()']
userName0 = userName.split("@")[0]
userName0 = re.sub('[!#$%&\'*+-/=?^`{}|\.]+', '_', userName0)
userName1 = userName.split("@")[1]
userName = f'{userName0}@{userName1}'
dbutils.fs.mkdirs(f"/Users/{userName}/data")
userDir = f"/Users/{userName}/data"

spark.sql(f"use {databaseName}")

print('UserDir ' + userDir)
print('userName '+ userName)
print('Using database '+ databaseName)

UserDir /Users/gordonhew@gmail.com/data
userName gordonhew@gmail.com
Using database fa2023_group06_enefit_test


In [0]:
if dataset == 'train':
  flattened_sql = """
  select 
  a.county as {dataset}_county,
  a.is_business as {dataset}_is_business,
  a.product_type as {dataset}_product_type,
  a.target as {dataset}_target,
  a.is_consumption as {dataset}_is_consumption, 
  a.datetime as {dataset}_datetime,
  a.data_block_id as {dataset}_data_block_id,
  a.row_id as {dataset}_row_id,
  a.prediction_unit_id as {dataset}_prediction_unit_id,

  b.forecast_date as gas_prices_forecast_date,
  b.lowest_price_per_mwh as gas_prices_lowest_price_per_mwh,
  b.highest_price_per_mwh as gas_prices_highest_price_per_mwh,
  b.origin_date as gas_prices_origin_date,
  b.data_block_id as gas_prices_data_block_id,

  c.forecast_date as electric_prices_forecast_date,
  c.euros_per_mwh as electric_prices_euros_per_mwh,
  c.origin_date as electric_prices_origin_date,
  c.data_block_id as electric_prices_data_block_id,

  d.product_type as client_product_type,
  d.county as client_county,
  d.eic_count as client_eic_count,
  d.installed_capacity as client_installed_capacity,
  d.is_business as client_is_business,
  d.date as client_date,
  d.data_block_id as client_data_block_id

  from silver_{dataset} as a
  left join silver_gas_prices as b

    on date(a.datetime) = dateadd(b.forecast_date , 1)

  left join silver_electricity_prices
    as c on date(a.datetime) = dateadd(c.forecast_date , 1)

  left join silver_client d on d.product_type = a.product_type and d.county = a.county and d.is_business = a.is_business
    and d.date =  dateadd(date(a.datetime), 0 )
  """.format(dataset=dataset)

  gold_all_features_df = spark.sql(flattened_sql)
  gold_all_features_df.write.mode('overwrite').saveAsTable('gold_all_features')

In [0]:
if dataset == "train":
  spark.sql("SELECT * FROM gold_all_features LIMIT 10")

In [0]:
if dataset == "train":
  spark.sql("SELECT COUNT(1) FROM gold_all_features")

## Creating/Merging/Upserting energy_prices_aggregate

In [0]:
def create_merge_upsert(logical_table_name, select_sql, merge_upsert_sql):
  data_df = spark.sql(select_sql)
  gold_table = 'gold_' + logical_table_name

  if not spark.catalog.tableExists(gold_table):
    print(gold_table, ' does not exist, creating')
    data_df.write.mode('overwrite').saveAsTable(gold_table)
  else:
    print('merge/upsert into', gold_table)
    temp_table = 'temp_' + logical_table_name
    data_df.createOrReplaceTempView(temp_table)
    spark.sql(merge_upsert_sql)

    spark.catalog.dropTempView(temp_table)

In [0]:
price_forecast_sql = """
WITH gold_electricity_aggregate AS 
(
SELECT
  TO_DATE(forecast_date) forecast_date,
  MEDIAN(euros_per_mwh) AS electricity_price_per_mwh_median,
  AVG(euros_per_mwh) AS electricity_price_per_mwh_average
  FROM silver_electricity_prices
  GROUP BY TO_DATE(forecast_date)
),
silver_gas_aggregate as 
(
  SELECT 
    TO_DATE(forecast_date) forecast_date,
    MEDIAN(lowest_price_per_mwh) AS gas_lowest_price_per_mwh_median,
    AVG(lowest_price_per_mwh) AS gas_lowest_price_per_mwh_average,
    MEDIAN(highest_price_per_mwh) AS gas_highest_price_per_mwh_median,
    AVG(highest_price_per_mwh) AS gas_highest_price_per_mwh_average
  FROM silver_gas_prices
  GROUP BY TO_DATE(forecast_date)
)
SELECT 
  coalesce(g.forecast_date, s.forecast_date) AS forecast_date,
  electricity_price_per_mwh_median,
  electricity_price_per_mwh_average,
  gas_lowest_price_per_mwh_median,
  gas_lowest_price_per_mwh_average,
  gas_highest_price_per_mwh_median,
  gas_highest_price_per_mwh_average
FROM 
  gold_electricity_aggregate g 
  FULL OUTER JOIN silver_gas_aggregate s ON g.forecast_date = s.forecast_date
ORDER BY g.forecast_date
"""

price_forecast_merge_upsert_sql = """
  MERGE INTO gold_{table_name}
  USING temp_{table_name}
  ON gold_{table_name}.forecast_date = temp_{table_name}.forecast_date
  WHEN MATCHED THEN
    UPDATE SET
      electricity_price_per_mwh_median = temp_{table_name}.electricity_price_per_mwh_median,
      electricity_price_per_mwh_average = temp_{table_name}.electricity_price_per_mwh_average,
      gas_lowest_price_per_mwh_median = temp_{table_name}.gas_lowest_price_per_mwh_median,
      gas_lowest_price_per_mwh_average = temp_{table_name}.gas_lowest_price_per_mwh_average,
      gas_highest_price_per_mwh_median = temp_{table_name}.gas_highest_price_per_mwh_median,
      gas_highest_price_per_mwh_average  = temp_{table_name}.gas_highest_price_per_mwh_median      
  WHEN NOT MATCHED
    THEN INSERT (
      forecast_date,
      electricity_price_per_mwh_median,
      electricity_price_per_mwh_average,
      gas_lowest_price_per_mwh_median,
      gas_lowest_price_per_mwh_average,
      gas_highest_price_per_mwh_median,
      gas_highest_price_per_mwh_average
    )
    VALUES (
      temp_{table_name}.forecast_date,
      temp_{table_name}.electricity_price_per_mwh_median,
      temp_{table_name}.electricity_price_per_mwh_average,
      temp_{table_name}.gas_lowest_price_per_mwh_median,
      temp_{table_name}.gas_lowest_price_per_mwh_average,
      temp_{table_name}.gas_highest_price_per_mwh_median,
      temp_{table_name}.gas_highest_price_per_mwh_average
    )
  WHEN NOT MATCHED BY SOURCE THEN
    DELETE    
  """.format(table_name='energy_prices_aggregate')

create_merge_upsert('energy_prices_aggregate', price_forecast_sql, price_forecast_merge_upsert_sql)

gold_energy_prices_aggregate  does not exist, creating


In [0]:
%sql
SELECT * FROM gold_energy_prices_aggregate ORDER BY forecast_date DESC LIMIT 10

forecast_date,electricity_price_per_mwh_median,electricity_price_per_mwh_average,gas_lowest_price_per_mwh_median,gas_lowest_price_per_mwh_average,gas_highest_price_per_mwh_median,gas_highest_price_per_mwh_average
2023-05-30,67.38,55.55124999999999,29.0,29.0,34.0,34.0
2023-05-29,21.15,35.75708333333333,28.16,28.16,36.98,36.98
2023-05-28,65.055,44.45125,28.1,28.1,34.1,34.1
2023-05-27,82.7,62.56166666666667,28.3,28.3,34.1,34.1


In [0]:
%sql
SELECT COUNT(1) FROM gold_energy_prices_aggregate

count(1)
4


## Creating/Merging/Upserting gold_consumption_production_aggregate

In [0]:
if dataset == 'train':
  consumption_production_sql = """
  WITH total_consumption AS (
    SELECT 
      TO_DATE(datetime) AS date,
      sum(target) AS total_consumption_amt
    FROM silver_train
    WHERE 
      is_consumption = 1
    GROUP BY date    
  ),
  total_production AS (
    SELECT 
        TO_DATE(datetime) AS date,
        sum(target) AS total_production_amt
      FROM silver_train
      WHERE 
        is_consumption = 0
      GROUP BY date
  )
  SELECT 
    coalesce(c.date, p.date) AS date,
    total_consumption_amt,
    total_production_amt,
    total_production_amt - total_consumption_amt AS net_amt
  FROM 
    total_consumption c JOIN total_production p ON c.date = p.date
  ORDER BY date ASC
  """

  consumption_production_merge_upsert_sql = """
    MERGE INTO gold_consumption_production_aggregate
    USING temp_consumption_production_aggregate
    ON gold_consumption_production_aggregate.date = temp_consumption_production_aggregate.date
    WHEN MATCHED THEN
      UPDATE SET
        total_consumption_amt = temp_consumption_production_aggregate.total_consumption_amt,
        total_production_amt = temp_consumption_production_aggregate.total_production_amt,
        net_amt = temp_consumption_production_aggregate.net_amt
    WHEN NOT MATCHED
      THEN INSERT (
        date,
        total_consumption_amt,
        total_production_amt,
        net_amt
      )
      VALUES (
        temp_consumption_production_aggregate.date,
        temp_consumption_production_aggregate.total_consumption_amt,
        temp_consumption_production_aggregate.total_production_amt,
        temp_consumption_production_aggregate.net_amt
      )
    WHEN NOT MATCHED BY SOURCE THEN
      DELETE 
  """

  create_merge_upsert('consumption_production_aggregate', consumption_production_sql, consumption_production_merge_upsert_sql)

merge/upsert into gold_consumption_production_aggregate


In [0]:
if dataset == 'train':
  display(spark.sql("SELECT * FROM gold_consumption_production_aggregate ORDER BY date DESC LIMIT 10"))

date,total_consumption_amt,total_production_amt,net_amt
2023-05-31,612491.0589999998,667617.9430000002,55126.88400000043
2023-05-30,638906.3710000002,584241.7649999999,-54664.60600000026
2023-05-29,603023.7739999997,621032.624,18008.85000000021
2023-05-28,415896.57300000015,764474.0430000003,348577.47000000015
2023-05-27,427683.894,771194.5780000004,343510.6840000005
2023-05-26,615020.3289999999,623860.5640000001,8840.235000000219
2023-05-25,695600.1629999997,411391.52599999984,-284208.6369999999
2023-05-24,621441.0129999998,625928.0669999999,4487.05400000012
2023-05-23,637593.071,546112.0890000003,-91480.98199999973
2023-05-22,638716.2619999999,510611.4979999999,-128104.76400000002


In [0]:
if dataset == 'train':
  display(spark.sql("SELECT COUNT(1) FROM gold_consumption_production_aggregate"))

count(1)
637


## Stream data to gold train/test dataset

In [0]:
dataset_table = "gold_" + dataset

checkpoint_path = userDir + '/final_project/' + databaseName + '/_checkpoint/' + dataset_table

print('checkpoint path', checkpoint_path)

input_stream_df = spark.readStream \
  .table('silver_' + dataset) \
  .writeStream \
  .option("checkpointLocation", checkpoint_path) \
  .outputMode("append") \
  .trigger(once=True) \
  .toTable(dataset_table)

checkpoint path /Users/gordonhew@gmail.com/data/final_project/fa2023_group06_enefit_train_ghew/_checkpoint/gold_train


In [0]:
# this will be lagging because of async
display(spark.sql('SELECT * FROM gold_' + dataset + ' LIMIT 10'))

county,is_business,product_type,target,is_consumption,datetime,data_block_id,row_id,prediction_unit_id
2,1,3,2066.227,1,2022-04-06T03:00:00Z,217,667155,10
8,0,1,0.0,0,2022-04-06T03:00:00Z,217,667198,31
10,1,1,78.223,1,2022-04-06T04:00:00Z,217,667353,40
11,1,0,0.0,0,2022-04-06T08:00:00Z,217,667908,67
7,0,1,10.036,0,2022-04-06T09:00:00Z,217,668002,25
11,1,3,305.842,0,2022-04-06T09:00:00Z,217,668050,48
6,1,3,130.571,1,2022-04-06T12:00:00Z,217,668409,24
5,1,3,306.425,1,2022-04-06T14:00:00Z,217,668679,23
7,0,1,23.261,1,2022-04-06T16:00:00Z,217,668955,25
0,1,0,352.52,1,2022-04-06T19:00:00Z,217,669311,3


In [0]:
# this will be lagging because of async
display(spark.sql('SELECT COUNT(1) FROM gold_' + dataset))

count(1)
4712332
