# Transformation Silver → Gold

##  Objectif
Créer un modèle dimensionnel (star schema) optimisé pour l'analyse.

##  Modèle créé
- **4 dimensions** : Date, Time, Turbine, Operational Status
- **1 fait** : Production éolienne

##  Dépendances
- **Input** : LH_Wind_Power_Silver.dbo.wind_power
- **Output** : LH_Wind_Power_Gold (5 tables)

In [13]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col, when

StatementMeta(, e12e8862-1259-46d5-b450-922bf8380394, 15, Finished, Available, Finished, False)

In [2]:
silver_table_path = "abfss://WindPowerAnalytics1@onelake.dfs.fabric.microsoft.com/LH_Wind_Power_Silver.Lakehouse/Tables/dbo/wind_power"

df = spark.read.format("delta").load(silver_table_path)

print(f" Données chargées : {df.count()} lignes")

StatementMeta(, e12e8862-1259-46d5-b450-922bf8380394, 4, Finished, Available, Finished, False)

 Données chargées : 864 lignes


In [3]:
# Créer dim_date avec valeurs uniques
date_dim = (df
    .select("date", "day", "month", "quarter", "year")
    .distinct()
    .withColumnRenamed("date", "date_id")
)

print(f" dim_date : {date_dim.count()} dates uniques")
date_dim.show(5)

StatementMeta(, e12e8862-1259-46d5-b450-922bf8380394, 5, Finished, Available, Finished, False)

 dim_date : 2 dates uniques
+-------------------+---+-----+-------+----+
|            date_id|day|month|quarter|year|
+-------------------+---+-----+-------+----+
|2024-06-16 00:00:00| 16|    6|      2|2024|
|2024-06-15 00:00:00| 15|    6|      2|2024|
+-------------------+---+-----+-------+----+



In [4]:
# Créer dim_time avec valeurs uniques
time_dim = (df
    .select("time", "hour_of_day", "minute_of_hour", "second_of_minute", "time_period")
    .distinct()
    .withColumnRenamed("time", "time_id")
)

print(f" dim_time : {time_dim.count()} temps uniques")
time_dim.show(5)

StatementMeta(, e12e8862-1259-46d5-b450-922bf8380394, 6, Finished, Available, Finished, False)

 dim_time : 144 temps uniques
+--------+-----------+--------------+----------------+-----------+
| time_id|hour_of_day|minute_of_hour|second_of_minute|time_period|
+--------+-----------+--------------+----------------+-----------+
|23:40:00|         23|            40|               0|      Night|
|16:10:00|         16|            10|               0|  Afternoon|
|16:50:00|         16|            50|               0|  Afternoon|
|22:30:00|         22|            30|               0|      Night|
|15:50:00|         15|            50|               0|  Afternoon|
+--------+-----------+--------------+----------------+-----------+
only showing top 5 rows



In [5]:
# Créer dim_turbine avec ID auto-incrémenté
turbine_dim = (df
    .select("turbine_name", "capacity", "location_name", "latitude", "longitude", "region")
    .distinct()
    .withColumn("turbine_id", 
        row_number().over(
            Window.orderBy("turbine_name", "capacity", "location_name")
        )
    )
)

print(f" dim_turbine : {turbine_dim.count()} turbines")
turbine_dim.show()

StatementMeta(, e12e8862-1259-46d5-b450-922bf8380394, 7, Finished, Available, Finished, False)

 dim_turbine : 3 turbines
+------------+--------+-------------+--------+---------+--------+----------+
|turbine_name|capacity|location_name|latitude|longitude|  region|turbine_id|
+------------+--------+-------------+--------+---------+--------+----------+
|   Turbine A|    2200|   Location 1| 34.0522|-118.2437|Region A|         1|
|   Turbine B|    2000|   Location 2| 36.7783|-119.4179|Region B|         2|
|   Turbine C|    2500|   Location 3| 40.7128|  -74.006|Region C|         3|
+------------+--------+-------------+--------+---------+--------+----------+



In [6]:
# Créer dim_operational_status avec ID
operational_status_dim = (df
    .select("status", "responsible_department")
    .distinct()
    .withColumn("status_id", 
        row_number().over(
            Window.orderBy("status", "responsible_department")
        )
    )
)

print(f" dim_operational_status : {operational_status_dim.count()} statuts")
operational_status_dim.show()

StatementMeta(, e12e8862-1259-46d5-b450-922bf8380394, 8, Finished, Available, Finished, False)

 dim_operational_status : 6 statuts
+--------------------+----------------------+---------+
|              status|responsible_department|status_id|
+--------------------+----------------------+---------+
|               Fault|           Engineering|        1|
|          Inspection|           Maintenance|        2|
|             Offline|            Operations|        3|
|              Online|            Operations|        4|
|Preventive Mainte...|           Maintenance|        5|
|Reactive Maintenance|           Maintenance|        6|
+--------------------+----------------------+---------+



In [7]:
# Joindre pour récupérer les clés étrangères
df_with_keys = (df
    .join(turbine_dim, 
          ["turbine_name", "capacity", "location_name", "latitude", "longitude", "region"], 
          "left")
    .join(operational_status_dim, 
          ["status", "responsible_department"], 
          "left")
)

StatementMeta(, e12e8862-1259-46d5-b450-922bf8380394, 9, Finished, Available, Finished, False)

In [8]:
# Table de faits avec uniquement clés et mesures
fact_table = (df_with_keys
    .select(
        "production_id",
        col("date").alias("date_id"),
        col("time").alias("time_id"),
        "turbine_id",
        "status_id",
        "wind_speed",
        "wind_direction",
        "energy_produced"
    )
)

print(f" fact_wind_power : {fact_table.count()} mesures")
fact_table.show(10)

StatementMeta(, e12e8862-1259-46d5-b450-922bf8380394, 10, Finished, Available, Finished, False)

 fact_wind_power : 864 mesures
+-------------+-------------------+--------+----------+---------+----------+--------------+---------------+
|production_id|            date_id| time_id|turbine_id|status_id|wind_speed|wind_direction|energy_produced|
+-------------+-------------------+--------+----------+---------+----------+--------------+---------------+
|         6535|2024-06-16 00:00:00|03:00:00|         1|        4|     17.18|            SW|        1896.51|
|         6536|2024-06-16 00:00:00|03:00:00|         2|        4|     18.43|            SE|        1396.19|
|         6537|2024-06-16 00:00:00|03:00:00|         3|        6|     19.28|            NW|            0.0|
|         6538|2024-06-16 00:00:00|03:10:00|         1|        4|      4.49|            SW|        1648.24|
|         6539|2024-06-16 00:00:00|03:10:00|         2|        4|      6.87|             N|        1668.04|
|         6540|2024-06-16 00:00:00|03:10:00|         3|        4|      3.73|             N|        1593.8

In [9]:
base_path = "abfss://WindPowerAnalytics1@onelake.dfs.fabric.microsoft.com/LH_Wind_Power_Gold.Lakehouse/Tables/dbo"

gold_date_dim_path = f"{base_path}/dim_date"
gold_time_dim_path = f"{base_path}/dim_time"
gold_turbine_dim_path = f"{base_path}/dim_turbine"
gold_operational_status_dim_path = f"{base_path}/dim_operational_status"
gold_fact_table_path = f"{base_path}/fact_wind_power"

StatementMeta(, e12e8862-1259-46d5-b450-922bf8380394, 11, Finished, Available, Finished, False)

In [10]:
# Sauvegarder toutes les tables
date_dim.write.format("delta").mode("overwrite").save(gold_date_dim_path)
time_dim.write.format("delta").mode("overwrite").save(gold_time_dim_path)
turbine_dim.write.format("delta").mode("overwrite").save(gold_turbine_dim_path)
operational_status_dim.write.format("delta").mode("overwrite").save(gold_operational_status_dim_path)
fact_table.write.format("delta").mode("overwrite").save(gold_fact_table_path)

print(" Toutes les tables créées dans Gold !")

StatementMeta(, e12e8862-1259-46d5-b450-922bf8380394, 12, Finished, Available, Finished, False)

 Toutes les tables créées dans Gold !


In [14]:
from pyspark.sql.functions import count

print("===  Vérification intégrité référentielle ===\n")

# Vérifier les dates
fact_dates = fact_table.select("date_id").distinct().count()
dim_dates = date_dim.count()
print(f" Dates - Fait: {fact_dates}, Dim: {dim_dates}")

# Vérifier les turbines
fact_turbines = fact_table.select("turbine_id").distinct().count()
dim_turbines = turbine_dim.count()
print(f" Turbines - Fait: {fact_turbines}, Dim: {dim_turbines}")

# Vérifier les clés nulles
null_check = fact_table.select([
    count(when(col(c).isNull(), c)).alias(c) 
    for c in ["date_id", "time_id", "turbine_id", "status_id"]
])
print("\n Clés nulles dans fact :")
null_check.show()

StatementMeta(, e12e8862-1259-46d5-b450-922bf8380394, 16, Finished, Available, Finished, False)

===  Vérification intégrité référentielle ===

 Dates - Fait: 2, Dim: 2
 Turbines - Fait: 3, Dim: 3

 Clés nulles dans fact :
+-------+-------+----------+---------+
|date_id|time_id|turbine_id|status_id|
+-------+-------+----------+---------+
|      0|      0|         0|        0|
+-------+-------+----------+---------+

