In [0]:
dbutils.widgets.text("start_date", "")
dbutils.widgets.text("ind_range", "")
ind_range = dbutils.widgets.get("ind_range")

In [0]:
%run "./nb_utils"

In [0]:
ingestion_config = MedallionConfig("measurement_electrical")
ingestion_config.set_logger()

gold_path_file = os.path.join(ingestion_config.gold_path, 'quoia_360')
logging.info(f"Table to be created in: {gold_path_file} \n ")

if ind_range:
    start_date = pd.to_datetime(dbutils.widgets.get("start_date")).date()  
else:
    start_date = pd.to_datetime("today").date()  # Get today's date without time

initial_day_of_month = start_date.replace(day=1)
last_day_of_month = (initial_day_of_month + pd.offsets.MonthEnd(0)).date()

logging.info(f"Range to execute:  between {initial_day_of_month} and {last_day_of_month} \n ")

state_view = f"""
CREATE OR REPLACE TEMP VIEW state_t0 AS
WITH temp AS (
  SELECT
    CASE
      WHEN state LIKE 'Grid-connected%' THEN 'Grid-connected'
      WHEN state LIKE 'Standby%' THEN 'Standby'
      WHEN state LIKE 'Shutdown%' THEN 'Shutdown'
      ELSE 'Standby'
    END AS state,
    date,
    EXTRACT(HOUR FROM time) AS hour,
    node_id
  FROM hive_metastore.quoia.measurement_state
  where date BETWEEN '{initial_day_of_month}' and '{last_day_of_month}'
)
SELECT
  date,
  hour,
  node_id,
  MAX(CASE WHEN state = 'Grid-connected' THEN 1 ELSE 0 END) AS Ind_Grid_Connected,
  MAX(CASE WHEN state = 'Standby' THEN 1 ELSE 0 END)        AS Ind_Standby,
  MAX(CASE WHEN state = 'Shutdown' THEN 1 ELSE 0 END)       AS Ind_Shutdown
FROM temp
GROUP BY date, hour, node_id; """

temperature_view = f"""
CREATE OR REPLACE TEMP VIEW temperature_t1 AS
SELECT 
  date,
  EXTRACT(HOUR FROM time) AS hour,
  node_id,
  AVG(temperature) AS temperature
FROM hive_metastore.quoia.measurement_temperature
where date  BETWEEN '{initial_day_of_month}' and '{last_day_of_month}'
GROUP BY date, hour, node_id;"""

node_view = f"""
CREATE OR REPLACE TEMP VIEW node_t2 AS
SELECT 
  n.id AS node_id,
  n.meter_id,
  n.category,
  n.archived,
  me.installation_type
FROM hive_metastore.quoia.node n
INNER JOIN  hive_metastore.quoia.meter_electric me 
  ON me.meter_id = n.meter_id;"""

electrical_view = f"""
CREATE OR REPLACE TEMP VIEW electrical_t3 AS
SELECT 
  node_id,
	date,
	extract(hour from time) as hour,
	sum(a.vp1) as vp1,
	sum(a.vp2) as vp2,
	sum(a.vp3) as vp3,
	sum(a.cp1) as cp1,
	sum(a.cp2) as cp2,
	sum(a.cp3) as cp3,
	sum(a.app1) as app1,
	sum(a.app2) as app2,
	sum(a.app3) as app3,
	sum(a.rpp1) as rpp1,
	sum(a.rpp2) as rpp2,
	sum(a.rpp3) as rpp3,
	sum(a.pfp1) as pfp1,
	sum(a.pfp2) as pfp2,
	sum(a.pfp3) as pfp3
from hive_metastore.quoia.measurement_electrical a
where date  BETWEEN '{initial_day_of_month}' and '{last_day_of_month}'
group by node_id, date, hour;"""

energy_view = f"""
CREATE OR REPLACE TEMP VIEW electrical_energy_t4 AS
SELECT 
  node_id,
	date,
	extract(hour from time) as hour,
	SUM(b.iaepd1) AS iaepd1,
	SUM(b.iaepd2) AS iaepd2,
	SUM(b.iaepd3) AS iaepd3,
	SUM(b.eaepd1) AS eaepd1,
	SUM(b.eaepd2) AS eaepd2,
	SUM(b.eaepd3) AS eaepd3,
	SUM(b.irepd1) AS irepd1,
	SUM(b.irepd2) AS irepd2,
	SUM(b.irepd3) AS irepd3,
	SUM(b.erepd1) AS erepd1,
	SUM(b.erepd2) AS erepd2,
	SUM(b.erepd3) AS erepd3
from hive_metastore.quoia.measurement_electrical_energy b
where date  BETWEEN '{initial_day_of_month}' and '{last_day_of_month}'
group by node_id, date, hour;"""

logging.info(f"Executing queries \n ")
spark.sql(state_view)
spark.sql(temperature_view)
spark.sql(electrical_view)
spark.sql(energy_view)
spark.sql(node_view)

df = spark.sql("""
  SELECT
    a.node_id,
    e.category,
    e.archived,
    e.installation_type,
    a.date,
    a.hour,
    COALESCE(a.vp1, 0) AS vp1,
    COALESCE(a.vp2, 0) AS vp2,
    COALESCE(a.vp3, 0) AS vp3,
    COALESCE(a.cp1, 0) AS cp1,
    COALESCE(a.cp2, 0) AS cp2,
    COALESCE(a.cp3, 0) AS cp3,
    COALESCE(a.app1, 0) AS app1,
    COALESCE(a.app2, 0) AS app2,
    COALESCE(a.app3, 0) AS app3,
    COALESCE(a.rpp1, 0) AS rpp1,
    COALESCE(a.rpp2, 0) AS rpp2,
    COALESCE(a.rpp3, 0) AS rpp3,
    COALESCE(a.pfp1, 0) AS pfp1,
    COALESCE(a.pfp2, 0) AS pfp2,
    COALESCE(a.pfp3, 0) AS pfp3,
    COALESCE(b.iaepd1, 0) AS iaepd1,
    COALESCE(b.iaepd2, 0) AS iaepd2,
    COALESCE(b.iaepd3, 0) AS iaepd3,
    COALESCE(b.eaepd1, 0) AS eaepd1,
    COALESCE(b.eaepd2, 0) AS eaepd2,
    COALESCE(b.eaepd3, 0) AS eaepd3,
    COALESCE(b.irepd1, 0) AS irepd1,
    COALESCE(b.irepd2, 0) AS irepd2,
    COALESCE(b.irepd3, 0) AS irepd3,
    COALESCE(b.erepd1, 0) AS erepd1,
    COALESCE(b.erepd2, 0) AS erepd2,
    COALESCE(b.erepd3, 0) AS erepd3,
    COALESCE(c.Ind_Grid_Connected, -1)  AS Ind_Grid_Connected,
    COALESCE(c.Ind_Standby, -1)         AS Ind_Standby,
    COALESCE(c.Ind_Shutdown, -1)        AS Ind_Shutdown,
    COALESCE(d.temperature, -1)         AS Temperature
  FROM electrical_t3 a
  INNER JOIN electrical_energy_t4  b
    ON a.node_id = b.node_id AND a.date = b.date AND a.hour = b.hour
  LEFT JOIN state_t0 c
    ON a.node_id = c.node_id AND a.date = c.date AND a.hour = c.hour
  LEFT JOIN temperature_t1 d
    ON a.node_id = d.node_id AND a.date = d.date AND a.hour = d.hour
  LEFT JOIN node_t2 e
    ON a.node_id = e.node_id
""")

logging.info(f"Data to be written, count: {df.count()}")

df.write.format(ingestion_config.gold_format)\
    .partitionBy("date")\
    .mode("overwrite") \
    .option("partitionOverwriteMode", "dynamic")\
    .save(gold_path_file) 

logging.info(f"Data written to gold - delta: {gold_path_file} \n ")

ingestion_config.create_schema_table(gold_path_file,"quoia_360")


2024-09-15 00:01:40,148 - root - INFO - Table to be created in: abfss://gold@datasparkcourse.dfs.core.windows.net/quoia_360 
 
2024-09-15 00:01:40,151 - root - INFO - Range to execute:  between 2024-06-01 and 2024-06-30 
 
2024-09-15 00:01:40,152 - root - INFO - Executing queries 
 
2024-09-15 00:01:44,580 - root - INFO - Data to be written, count: 20852
2024-09-15 00:01:56,158 - root - INFO - Data written to gold - delta: abfss://gold@datasparkcourse.dfs.core.windows.net/quoia_360 
 
