# Transformation Silver ‚Üí Gold

## üìã Objectif
Cr√©er un mod√®le dimensionnel (star schema) optimis√© pour l'analyse.

## üåü Mod√®le cr√©√©
- **4 dimensions** : Date, Time, Turbine, Operational Status
- **1 fait** : Production √©olienne

## üì¶ D√©pendances
- **Input** : LH_Wind_Power_Silver.dbo.wind_power
- **Output** : LH_Wind_Power_Gold (5 tables)

In [5]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

StatementMeta(, 7d06e087-fc2f-40a9-8728-04e4c6801a96, 7, Finished, Available, Finished)

In [6]:
silver_table_path = "abfss://c5409968-520e-49dc-87fb-b95c69c590fb@onelake.dfs.fabric.microsoft.com/bb452b78-630f-4d64-af7c-14df79792a97/Tables/dbo/wind_power"

df = spark.read.format("delta").load(silver_table_path)

print(f"üìä Donn√©es charg√©es : {df.count()} lignes")

StatementMeta(, 7d06e087-fc2f-40a9-8728-04e4c6801a96, 8, Finished, Available, Finished)

üìä Donn√©es charg√©es : 432 lignes


In [7]:
# Cr√©er dim_date avec valeurs uniques
date_dim = (df
    .select("date", "day", "month", "quarter", "year")
    .distinct()
    .withColumnRenamed("date", "date_id")
)

print(f"üìÖ dim_date : {date_dim.count()} dates uniques")
date_dim.show(5)

StatementMeta(, 7d06e087-fc2f-40a9-8728-04e4c6801a96, 9, Finished, Available, Finished)

üìÖ dim_date : 1 dates uniques
+----------+---+-----+-------+----+
|   date_id|day|month|quarter|year|
+----------+---+-----+-------+----+
|2024-06-15| 15|    6|      2|2024|
+----------+---+-----+-------+----+



In [8]:
# Cr√©er dim_time avec valeurs uniques
time_dim = (df
    .select("time", "hour_of_day", "minute_of_hour", "second_of_minute", "time_period")
    .distinct()
    .withColumnRenamed("time", "time_id")
)

print(f"‚è∞ dim_time : {time_dim.count()} temps uniques")
time_dim.show(5)

StatementMeta(, 7d06e087-fc2f-40a9-8728-04e4c6801a96, 10, Finished, Available, Finished)

‚è∞ dim_time : 144 temps uniques
+--------+-----------+--------------+----------------+-----------+
| time_id|hour_of_day|minute_of_hour|second_of_minute|time_period|
+--------+-----------+--------------+----------------+-----------+
|10:10:00|         10|            10|               0|    Morning|
|11:10:00|         11|            10|               0|    Morning|
|18:10:00|         18|            10|               0|    Evening|
|18:30:00|         18|            30|               0|    Evening|
|16:10:00|         16|            10|               0|  Afternoon|
+--------+-----------+--------------+----------------+-----------+
only showing top 5 rows



In [9]:
# Cr√©er dim_turbine avec ID auto-incr√©ment√©
turbine_dim = (df
    .select("turbine_name", "capacity", "location_name", "latitude", "longitude", "region")
    .distinct()
    .withColumn("turbine_id", 
        row_number().over(
            Window.orderBy("turbine_name", "capacity", "location_name")
        )
    )
)

print(f"üå¨Ô∏è dim_turbine : {turbine_dim.count()} turbines")
turbine_dim.show()

StatementMeta(, 7d06e087-fc2f-40a9-8728-04e4c6801a96, 11, Finished, Available, Finished)

üå¨Ô∏è dim_turbine : 3 turbines
+------------+--------+-------------+--------+---------+--------+----------+
|turbine_name|capacity|location_name|latitude|longitude|  region|turbine_id|
+------------+--------+-------------+--------+---------+--------+----------+
|   Turbine A|    2200|   Location 1| 34.0522|-118.2437|Region A|         1|
|   Turbine B|    2000|   Location 2| 36.7783|-119.4179|Region B|         2|
|   Turbine C|    2500|   Location 3| 40.7128|  -74.006|Region C|         3|
+------------+--------+-------------+--------+---------+--------+----------+



In [10]:
# Cr√©er dim_operational_status avec ID
operational_status_dim = (df
    .select("status", "responsible_department")
    .distinct()
    .withColumn("status_id", 
        row_number().over(
            Window.orderBy("status", "responsible_department")
        )
    )
)

print(f"üìä dim_operational_status : {operational_status_dim.count()} statuts")
operational_status_dim.show()

StatementMeta(, 7d06e087-fc2f-40a9-8728-04e4c6801a96, 12, Finished, Available, Finished)

üìä dim_operational_status : 6 statuts
+--------------------+----------------------+---------+
|              status|responsible_department|status_id|
+--------------------+----------------------+---------+
|               Fault|           Engineering|        1|
|          Inspection|           Maintenance|        2|
|             Offline|            Operations|        3|
|              Online|            Operations|        4|
|Preventive Mainte...|           Maintenance|        5|
|Reactive Maintenance|           Maintenance|        6|
+--------------------+----------------------+---------+



In [12]:
#CREER LA TABLE DE FAITS
# Joindre pour r√©cup√©rer les cl√©s √©trang√®res
df_with_keys = (df
    .join(turbine_dim, 
          ["turbine_name", "capacity", "location_name", "latitude", "longitude", "region"], 
          "left")
    .join(operational_status_dim, 
          ["status", "responsible_department"], 
          "left")
)

StatementMeta(, 7d06e087-fc2f-40a9-8728-04e4c6801a96, 14, Finished, Available, Finished)

In [13]:
# Table de faits avec uniquement cl√©s et mesures
fact_table = (df_with_keys
    .select(
        "production_id",
        col("date").alias("date_id"),
        col("time").alias("time_id"),
        "turbine_id",
        "status_id",
        "wind_speed",
        "wind_direction",
        "energy_produced"
    )
)

print(f"üìà fact_wind_power : {fact_table.count()} mesures")
fact_table.show(10)

StatementMeta(, 7d06e087-fc2f-40a9-8728-04e4c6801a96, 15, Finished, Available, Finished)

üìà fact_wind_power : 432 mesures
+-------------+----------+--------+----------+---------+----------+--------------+---------------+
|production_id|   date_id| time_id|turbine_id|status_id|wind_speed|wind_direction|energy_produced|
+-------------+----------+--------+----------+---------+----------+--------------+---------------+
|         6049|2024-06-15|00:00:00|         1|        4|      5.74|            SW|        1783.39|
|         6050|2024-06-15|00:00:00|         2|        2|     23.92|             E|            0.0|
|         6051|2024-06-15|00:00:00|         3|        4|      5.72|            NW|        1651.84|
|         6052|2024-06-15|00:10:00|         1|        4|     13.02|             E|        1351.88|
|         6053|2024-06-15|00:10:00|         2|        4|      8.77|             E|         931.99|
|         6054|2024-06-15|00:10:00|         3|        4|     23.31|             W|        1459.19|
|         6055|2024-06-15|00:20:00|         1|        4|      2.35|       

In [14]:
# Lister toutes les tables disponibles dans le catalogue Spark
spark.catalog.listTables()


StatementMeta(, 7d06e087-fc2f-40a9-8728-04e4c6801a96, 16, Finished, Available, Finished)

[Table(name='wind_power', catalog='spark_catalog', namespace=['chimcobldhq2alr9dpi50rrnclp42rj1dhsn8qb3ecmj4c1i6kikoi2vedkmotj5e8im8ojf'], description=None, tableType='MANAGED', isTemporary=False)]

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
spark = SparkSession.builder.getOrCreate()

base_path = "abfss://WindPowerAnalytics-2025@onelake.dfs.fabric.microsoft.com/LH_gold.Lakehouse/Tables/dbo"

# Essayer de lister toutes les tables pr√©sentes
try:
    df = spark.read.format("delta").load(base_path)
    df.show()
except Exception as e:
    print("Impossible de lire le path :", e)


StatementMeta(, 7d06e087-fc2f-40a9-8728-04e4c6801a96, 17, Finished, Available, Finished)

Impossible de lire le path : [DELTA_TABLE_NOT_FOUND] Delta table `abfss://WindPowerAnalytics-2025@onelake.dfs.fabric.microsoft.com/LH_gold.Lakehouse/Tables/dbo` doesn't exist.


In [16]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# 1Ô∏è‚É£ R√©cup√©rer le sch√©ma depuis ton DataFrame existant
schema = date_dim.schema

# 2Ô∏è‚É£ Cr√©er un DataFrame vide avec ce sch√©ma
empty_df = spark.createDataFrame([], schema)

# 3Ô∏è‚É£ D√©finir le chemin de base et le chemin de la table
base_path = "abfss://WindPowerAnalytics-2025@onelake.dfs.fabric.microsoft.com/LH_gold.Lakehouse/Tables/dbo"
gold_date_dim_path = f"{base_path}/dim_date"

# 4Ô∏è‚É£ √âcrire la table Delta vide
empty_df.write.format("delta").mode("overwrite").save(gold_date_dim_path)

print(f"‚úÖ Table dim_date cr√©√©e (vide) √† {gold_date_dim_path}")

StatementMeta(, 7d06e087-fc2f-40a9-8728-04e4c6801a96, 18, Finished, Available, Finished)

‚úÖ Table dim_date cr√©√©e (vide) √† abfss://WindPowerAnalytics-2025@onelake.dfs.fabric.microsoft.com/LH_gold.Lakehouse/Tables/dbo/dim_date


In [17]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# 1Ô∏è‚É£ R√©cup√©rer le sch√©ma depuis ton DataFrame existant
time_schema = time_dim.schema

# 2Ô∏è‚É£ Cr√©er un DataFrame vide avec ce sch√©ma
empty_time_df = spark.createDataFrame([], time_schema)

# 3Ô∏è‚É£ D√©finir le chemin de la table
base_path = "abfss://WindPowerAnalytics-2025@onelake.dfs.fabric.microsoft.com/LH_gold.Lakehouse/Tables/dbo"
gold_time_dim_path = f"{base_path}/dim_time"

# 4Ô∏è‚É£ √âcrire la table Delta vide
empty_time_df.write.format("delta").mode("overwrite").save(gold_time_dim_path)

print(f"‚úÖ Table dim_time cr√©√©e (vide) √† {gold_time_dim_path}")

time_dim.write.format("delta").mode("append").save(gold_time_dim_path)



StatementMeta(, 7d06e087-fc2f-40a9-8728-04e4c6801a96, 19, Finished, Available, Finished)

‚úÖ Table dim_time cr√©√©e (vide) √† abfss://WindPowerAnalytics-2025@onelake.dfs.fabric.microsoft.com/LH_gold.Lakehouse/Tables/dbo/dim_time


In [18]:
spark = SparkSession.builder.getOrCreate()

# turbine_dim est d√©j√† d√©fini avec ID auto-incr√©ment√©
# 1Ô∏è‚É£ R√©cup√©rer le sch√©ma
turbine_schema = turbine_dim.schema

# 2Ô∏è‚É£ Cr√©er un DataFrame vide avec ce sch√©ma
empty_turbine_df = spark.createDataFrame([], turbine_schema)

# 3Ô∏è‚É£ D√©finir le chemin de la table
base_path = "abfss://WindPowerAnalytics-2025@onelake.dfs.fabric.microsoft.com/LH_gold.Lakehouse/Tables/dbo"
gold_turbine_dim_path = f"{base_path}/dim_turbine"

# 4Ô∏è‚É£ √âcrire la table Delta vide
empty_turbine_df.write.format("delta").mode("overwrite").save(gold_turbine_dim_path)
print(f"‚úÖ Table dim_turbine cr√©√©e (vide) √† {gold_turbine_dim_path}")

# 5Ô∏è‚É£ Peupler avec les donn√©es r√©elles
turbine_dim.write.format("delta").mode("append").save(gold_turbine_dim_path)

StatementMeta(, 7d06e087-fc2f-40a9-8728-04e4c6801a96, 20, Finished, Available, Finished)

‚úÖ Table dim_turbine cr√©√©e (vide) √† abfss://WindPowerAnalytics-2025@onelake.dfs.fabric.microsoft.com/LH_gold.Lakehouse/Tables/dbo/dim_turbine


In [19]:
spark = SparkSession.builder.getOrCreate()


# 2Ô∏è‚É£ R√©cup√©rer le sch√©ma
status_schema = operational_status_dim.schema

# 3Ô∏è‚É£ Cr√©er un DataFrame vide avec ce sch√©ma
empty_status_df = spark.createDataFrame([], status_schema)

# 4Ô∏è‚É£ D√©finir le chemin de la table
base_path = "abfss://WindPowerAnalytics-2025@onelake.dfs.fabric.microsoft.com/LH_gold.Lakehouse/Tables/dbo"
gold_operational_status_dim_path = f"{base_path}/dim_operational_status"

# 5Ô∏è‚É£ √âcrire la table Delta vide
empty_status_df.write.format("delta").mode("overwrite").save(gold_operational_status_dim_path)

print(f"‚úÖ Table dim_operational_status cr√©√©e (vide) √† {gold_operational_status_dim_path}")
operational_status_dim.write.format("delta").mode("append").save(gold_operational_status_dim_path)

StatementMeta(, 7d06e087-fc2f-40a9-8728-04e4c6801a96, 21, Finished, Available, Finished)

‚úÖ Table dim_operational_status cr√©√©e (vide) √† abfss://WindPowerAnalytics-2025@onelake.dfs.fabric.microsoft.com/LH_gold.Lakehouse/Tables/dbo/dim_operational_status


In [20]:
##table de fait

spark = SparkSession.builder.getOrCreate()

# 1Ô∏è‚É£ fact_table est d√©j√† d√©finie plus haut

# 2Ô∏è‚É£ R√©cup√©rer le sch√©ma
fact_schema = fact_table.schema

# 3Ô∏è‚É£ Cr√©er un DataFrame vide avec ce sch√©ma
empty_fact_df = spark.createDataFrame([], fact_schema)

# 4Ô∏è‚É£ D√©finir le chemin de la table
base_path = "abfss://WindPowerAnalytics-2025@onelake.dfs.fabric.microsoft.com/LH_gold.Lakehouse/Tables/dbo"
gold_fact_table_path = f"{base_path}/fact_wind_power"

# 5Ô∏è‚É£ √âcrire la table Delta vide
empty_fact_df.write.format("delta").mode("overwrite").save(gold_fact_table_path)
print(f"‚úÖ Table fact_wind_power cr√©√©e (vide) √† {gold_fact_table_path}")

# 6Ô∏è‚É£ Peupler la table avec les donn√©es r√©elles
fact_table.write.format("delta").mode("append").save(gold_fact_table_path)


StatementMeta(, 7d06e087-fc2f-40a9-8728-04e4c6801a96, 22, Finished, Available, Finished)

‚úÖ Table fact_wind_power cr√©√©e (vide) √† abfss://WindPowerAnalytics-2025@onelake.dfs.fabric.microsoft.com/LH_gold.Lakehouse/Tables/dbo/fact_wind_power


In [22]:
#base_path = "abfss://WindPowerAnalytics-2025@onelake.dfs.fabric.microsoft.com/LH_gold.Lakehouse/Tables/dbo"
#gold_date_dim_path = f"{base_path}/dim_date"
#gold_time_dim_path = f"{base_path}/dim_time"
#gold_turbine_dim_path = f"{base_path}/dim_turbine"
#gold_operational_status_dim_path = f"{base_path}/dim_operational_status"
#gold_fact_table_path = f"{base_path}/fact_wind_power"

StatementMeta(, 1a6bb414-ac61-401b-977e-69428e061f1c, 24, Finished, Available, Finished)

In [23]:
from pyspark.sql.functions import count, when, col 

print("=== üîç V√©rification int√©grit√© r√©f√©rentielle ===\n")

# V√©rifier les dates
fact_dates = fact_table.select("date_id").distinct().count()
dim_dates = date_dim.count()
print(f"üìÖ Dates - Fait: {fact_dates}, Dim: {dim_dates}")

# V√©rifier les turbines
fact_turbines = fact_table.select("turbine_id").distinct().count()
dim_turbines = turbine_dim.count()
print(f"üå¨Ô∏è Turbines - Fait: {fact_turbines}, Dim: {dim_turbines}")

# V√©rifier les cl√©s nulles
null_check = fact_table.select([
    count(when(col(c).isNull(), c)).alias(c) 
    for c in ["date_id", "time_id", "turbine_id", "status_id"]
])
print("\n‚ùå Cl√©s nulles dans fact :")
null_check.show()


StatementMeta(, 7d06e087-fc2f-40a9-8728-04e4c6801a96, 25, Finished, Available, Finished)

=== üîç V√©rification int√©grit√© r√©f√©rentielle ===

üìÖ Dates - Fait: 1, Dim: 1
üå¨Ô∏è Turbines - Fait: 3, Dim: 3

‚ùå Cl√©s nulles dans fact :
+-------+-------+----------+---------+
|date_id|time_id|turbine_id|status_id|
+-------+-------+----------+---------+
|      0|      0|         0|        0|
+-------+-------+----------+---------+

