# SIVER TO GOLD TRANSFORMATION
This notebook performs transformations on data from the Silver Lakhouse and saves transformed data to the Gold Lakehouse. 

In [1]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

StatementMeta(, 8a24a5b0-a360-4f7b-9b69-291afd0c66df, 3, Finished, Available, Finished)

In [2]:
# Define the path to the wind_power_production table in the silver Lakehouse
silver_table_path = "abfss://WINDPOWERGENERATION@onelake.dfs.fabric.microsoft.com/LH_SILVER.Lakehouse/Tables/wind_power_production"

# Load the wind_power_production table 
df = spark.read.format("delta").load(silver_table_path)

StatementMeta(, 8a24a5b0-a360-4f7b-9b69-291afd0c66df, 4, Finished, Available, Finished)

In [3]:
# Create the date Dimension table
date_dim = df.select("date", "day", "month", "quarter", "year").distinct() \
    .withColumnRenamed("date", "date_id")

# Create the Time Dimension table
time_dim = df.select('time', "hour_of_day", "minute_of_hour", "second_of_minute", "time_period").distinct() \
    .withColumnRenamed('time', 'time_id')

# Create the Turbine Dimension table
windowSpec = Window.orderBy(
    "turbine_name", 
    "capacity", 
    "location_name", 
    "latitude", 
    "longitude", 
    "region"
)

# Construire la dimension Turbine
turbine_dim = (
    df.select("turbine_name", "capacity", "location_name", "latitude", "longitude", "region")
      .distinct()
      .withColumn("turbine_id", row_number().over(windowSpec))
)


# Create the operational status Dimension table
windowSpec = Window.orderBy(
    "status", 
    "responsible_department"
)

# Create dimension operational status Dimension
operational_status_dim = (
    df.select("status","responsible_department")
      .distinct()
      .withColumn("status_id", row_number().over(windowSpec))
)




StatementMeta(, 8a24a5b0-a360-4f7b-9b69-291afd0c66df, 5, Finished, Available, Finished)

In [4]:
# Join the diemsion code to the original dataframe

turbine_join_cols = ["turbine_name", "capacity", "location_name", "latitude", "longitude", "region"]

# Join avec turbine_dim
df = df.join(
    turbine_dim.select(*turbine_join_cols, "turbine_id"),  # inclure la colonne ID
    on=turbine_join_cols,
    how="left"
)

# Colonnes de jointure pour operational_status_dim
status_join_cols = ["status", "responsible_department"]

# Join avec operational_status_dim
df = df.join(
    operational_status_dim.select(*status_join_cols, "status_id"),  # inclure l'ID
    on=status_join_cols,
    how="left"
)


StatementMeta(, 8a24a5b0-a360-4f7b-9b69-291afd0c66df, 6, Finished, Available, Finished)

In [5]:
# Create the Fact table 
fact_table = df.select("production_id", "date", "time", "turbine_id", "status_id", "wind_speed", "wind_direction", "energy_produced") \
    .withColumnRenamed( "date", "date_id").withColumnRenamed('time', 'time_id')


StatementMeta(, 8a24a5b0-a360-4f7b-9b69-291afd0c66df, 7, Finished, Available, Finished)

In [6]:
# define the path to the gold lakehouse 
gold_date_dim_path = "abfss://WINDPOWERGENERATION@onelake.dfs.fabric.microsoft.com/LH_GOLD.Lakehouse/Tables/dim_date"
gold_time_dim_path = "abfss://WINDPOWERGENERATION@onelake.dfs.fabric.microsoft.com/LH_GOLD.Lakehouse/Tables/dim_time"
gold_turbine_dim_path = "abfss://WINDPOWERGENERATION@onelake.dfs.fabric.microsoft.com/LH_GOLD.Lakehouse/Tables/dim_turbine"
gold_operational_status_dim_path = "abfss://WINDPOWERGENERATION@onelake.dfs.fabric.microsoft.com/LH_GOLD.Lakehouse/Tables/dim_operational_status"
gold_fact_table_path = "abfss://WINDPOWERGENERATION@onelake.dfs.fabric.microsoft.com/LH_GOLD.Lakehouse/Tables/fact_wind_power_production"

# Save to the tables in the Gold Lakehouse 
date_dim.write.format("delta").mode("overwrite").save(gold_date_dim_path)
time_dim.write.format("delta").mode("overwrite").save(gold_time_dim_path)
turbine_dim.write.format("delta").mode("overwrite").save(gold_turbine_dim_path)
operational_status_dim.write.format("delta").mode("overwrite").save(gold_operational_status_dim_path)
fact_table.write.format("delta").mode("overwrite").save(gold_fact_table_path) 

StatementMeta(, 8a24a5b0-a360-4f7b-9b69-291afd0c66df, 8, Finished, Available, Finished)