# Transformations de Silver en Gold

Ce Notebook effectue des transformations sur les données du Lakehouse Silver  et enregistre les données transformées dans le Lakehouse Gold.

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

In [None]:
# Définissez le chemin d'accès à la table wind_power_production dans Silver Lakehouse
silver_table_path = "abfss://WindPowerGeneration@onelake.dfs.fabric.microsoft.com/LH_Silver.Lakehouse/Tables/wind_power_production"

# Charge la table wind_power_production
df = spark.read.format("delta").load(silver_table_path)

In [None]:
# Crée la table de dimension 'Date'
date_dim = df.select("date", "day", "month", "quarter", "year").distinct() \
            .withColumnRenamed("date", "date_id")

# Crée la table de dimension 'Time'
time_dim = df.select("time", "hour_of_day", "minute_of_hour", "second_of_minute", "time_period").distinct() \
            .withColumnRenamed("time", "time_id")

# Crée la table de dimension des turbines 'turbine_dim'
turbine_dim = df.select("turbine_name", "capacity", "location_name", "latitude", "longitude", "region").distinct() \
                .withColumn("turbine_id", row_number().over(Window.orderBy("turbine_name", "capacity", "location_name", "latitude", "longitude", "region")))

# Crée la table de dimension Statut opérationnel'operational_status_dim'
operational_status_dim = df.select("status", "responsible_department").distinct() \
                .withColumn("status_id", row_number().over(Window.orderBy("status", "responsible_department")))

In [None]:
# Fais la jointure entre les tables de dimension turbine_dim et operational_status_dim avec le DataFrame d'origine
df = df.join(turbine_dim, ["turbine_name", "capacity", "location_name", "latitude", "longitude", "region"], "left") \
        .join(operational_status_dim, ["status", "responsible_department"], "left")

In [None]:
# Crée la table de faits
fact_table = df.select("production_id", "date", "time", "turbine_id", "status_id", "wind_speed", "wind_direction", "energy_produced") \
                .withColumnRenamed("date", "date_id").withColumnRenamed("time", "time_id")

In [None]:
# Définis les chemins d'accès aux tables dans le Lakehouse Gold
gold_date_dim_path = "abfss://WindPowerGeneration@onelake.dfs.fabric.microsoft.com/LH_Gold.Lakehouse/Tables/dim_date"
gold_time_dim_path = "abfss://WindPowerGeneration@onelake.dfs.fabric.microsoft.com/LH_Gold.Lakehouse/Tables/dim_time"
gold_turbine_dim_path = "abfss://WindPowerGeneration@onelake.dfs.fabric.microsoft.com/LH_Gold.Lakehouse/Tables/dim_turbine"
gold_operational_status_dim_path = "abfss://WindPowerGeneration@onelake.dfs.fabric.microsoft.com/LH_Gold.Lakehouse/Tables/dim_operational_status"
gold_fact_table_path = "abfss://WindPowerGeneration@onelake.dfs.fabric.microsoft.com/LH_Gold.Lakehouse/Tables/fact_wind_power_production"

# Sauvegarde les tables dans le Lakehouse Gold
date_dim.write.format("delta").mode("overwrite").save(gold_date_dim_path)
time_dim.write.format("delta").mode("overwrite").save(gold_time_dim_path)
turbine_dim.write.format("delta").mode("overwrite").save(gold_turbine_dim_path)
operational_status_dim.write.format("delta").mode("overwrite").save(gold_operational_status_dim_path)
fact_table.write.format("delta").mode("overwrite").save(gold_fact_table_path)