# Notebook : Ingestion quotidienne de donn√©es √©oliennes

## üìã Objectif
Charger de mani√®re incr√©mentale les donn√©es de production √©olienne depuis GitHub vers le Lakehouse Bronze.

## üîÑ Logique
1. Identifier la date la plus r√©cente dans Bronze
2. Calculer le jour suivant (date + 1)
3. T√©l√©charger le fichier CSV correspondant depuis GitHub
4. Ajouter les nouvelles donn√©es en mode append

## ‚öôÔ∏è Ex√©cution
- **Fr√©quence recommand√©e** : Quotidienne (via pipeline)
- **Dur√©e moyenne** : 50-60 secondes
- **D√©pendances** : Repository GitHub public accessible

## üì¶ D√©pendances
- Lakehouse : LH_Wind_Power_Bronze
- Source : https://github.com/gsoulat/data-training-fabric/tree/main/eolienne

In [11]:
import requests
import pandas as pd
from datetime import datetime, timedelta

StatementMeta(, 2d461eb1-0999-4cce-ad35-540b4018191c, 13, Finished, Available, Finished)

In [12]:
# URL de base du repository GitHub
base_url = "https://raw.githubusercontent.com/gsoulat/data-training-fabric/main/eolienne/"

# Chemin vers la table Bronze
bronze_table_path = "abfss://WindPowerAnalytics_Vincent@onelake.dfs.fabric.microsoft.com/LH_Wind_Power_Bronze.Lakehouse/Tables/dbo/wind_power"

StatementMeta(, 2d461eb1-0999-4cce-ad35-540b4018191c, 14, Finished, Available, Finished)

In [13]:
# # ‚ö†Ô∏è CELLULE D'INITIALISATION - √Ä ex√©cuter UNE SEULE FOIS
# # Cette cellule cr√©e la table Bronze avec le premier fichier de donn√©es

# initial_date = "20240615"  # Premier jour disponible
# initial_url = f"{base_url}{initial_date}_wind_power_data.csv"

# print(f"üì• T√©l√©chargement du fichier initial : {initial_url}")

# # T√©l√©charger et charger le premier fichier
# df_initial = pd.read_csv(initial_url)
# df_initial['date'] = pd.to_datetime(df_initial['date'])

# print(f"‚úÖ Fichier charg√© : {len(df_initial)} lignes")
# print(f"üìä Aper√ßu des donn√©es :")
# print(df_initial.head())

# # Convertir en Spark DataFrame et sauvegarder
# df_spark_initial = spark.createDataFrame(df_initial)
# df_spark_initial.write.format('delta').mode("overwrite").save(bronze_table_path)

# print("‚úÖ Table Bronze initialis√©e avec succ√®s !")

StatementMeta(, 2d461eb1-0999-4cce-ad35-540b4018191c, 15, Finished, Available, Finished)

In [14]:
# V√©rifier que la table existe et afficher quelques statistiques
df_check = spark.read.format("delta").load(bronze_table_path)

print(f"üìä Nombre total de lignes : {df_check.count()}")
print(f"üìÖ Colonnes disponibles : {df_check.columns}")
print(f"üîç Aper√ßu des 5 premi√®res lignes :")
df_check.show(5)

StatementMeta(, 2d461eb1-0999-4cce-ad35-540b4018191c, 16, Finished, Available, Finished)

üìä Nombre total de lignes : 864
üìÖ Colonnes disponibles : ['production_id', 'date', 'time', 'turbine_name', 'capacity', 'location_name', 'latitude', 'longitude', 'region', 'status', 'responsible_department', 'wind_speed', 'wind_direction', 'energy_produced']
üîç Aper√ßu des 5 premi√®res lignes :
+-------------+-------------------+--------+------------+--------+-------------+--------+---------+--------+------+----------------------+----------+--------------+---------------+
|production_id|               date|    time|turbine_name|capacity|location_name|latitude|longitude|  region|status|responsible_department|wind_speed|wind_direction|energy_produced|
+-------------+-------------------+--------+------------+--------+-------------+--------+---------+--------+------+----------------------+----------+--------------+---------------+
|         6481|2024-06-16 00:00:00|00-00-00|   Turbine A|    2200|   Location 1| 34.0522|-118.2437|Region A|Online|            Operations|  17.49365|      

In [15]:
# Charger les donn√©es existantes depuis Bronze
df_spark = spark.read.format("delta").load(bronze_table_path)

# Convertir en Pandas pour manipulation plus facile
df_pandas = df_spark.toPandas()

print(f"üìä Donn√©es actuelles dans Bronze : {len(df_pandas)} lignes")

StatementMeta(, 2d461eb1-0999-4cce-ad35-540b4018191c, 17, Finished, Available, Finished)

üìä Donn√©es actuelles dans Bronze : 864 lignes


In [16]:
# Trouver la date la plus r√©cente dans les donn√©es
most_recent_date = pd.to_datetime(df_pandas['date'], format="%Y%m%d").max()

# Calculer le jour suivant
next_date = (most_recent_date + timedelta(days=1)).strftime("%Y%m%d")

print(f"üìÖ Date la plus r√©cente : {most_recent_date.strftime('%Y-%m-%d')}")
print(f"‚û°Ô∏è  Prochaine date √† charger : {next_date}")

StatementMeta(, 2d461eb1-0999-4cce-ad35-540b4018191c, 18, Finished, Available, Finished)

üìÖ Date la plus r√©cente : 2024-06-16
‚û°Ô∏è  Prochaine date √† charger : 20240617


In [17]:
# Construire l'URL du fichier CSV
file_url = f"{base_url}{next_date}_wind_power_data.csv"
print(f"üåê URL du fichier : {file_url}")

try:
    # T√©l√©charger le CSV depuis GitHub
    df_pandas_new = pd.read_csv(file_url)
    
    # Convertir la colonne date en datetime
    df_pandas_new['date'] = pd.to_datetime(df_pandas_new['date'])
    
    print(f"‚úÖ Nouvelles donn√©es t√©l√©charg√©es : {len(df_pandas_new)} lignes")
    print(f"üìä Aper√ßu des nouvelles donn√©es :")
    print(df_pandas_new.head())
    
except Exception as e:
    print(f"‚ùå Erreur lors du t√©l√©chargement : {e}")
    print(f"üí° Cela peut signifier que le fichier pour la date {next_date} n'existe pas encore.")

StatementMeta(, 2d461eb1-0999-4cce-ad35-540b4018191c, 19, Finished, Available, Finished)

üåê URL du fichier : https://raw.githubusercontent.com/gsoulat/data-training-fabric/main/eolienne/20240617_wind_power_data.csv
‚úÖ Nouvelles donn√©es t√©l√©charg√©es : 432 lignes
üìä Aper√ßu des nouvelles donn√©es :
   production_id       date      time turbine_name  capacity location_name  \
0           6913 2024-06-17  00-00-00    Turbine A      2200    Location 1   
1           6914 2024-06-17  00-00-00    Turbine B      2000    Location 2   
2           6915 2024-06-17  00-00-00    Turbine C      2500    Location 3   
3           6916 2024-06-17  00-10-00    Turbine A      2200    Location 1   
4           6917 2024-06-17  00-10-00    Turbine B      2000    Location 2   

   latitude  longitude    region  status responsible_department  wind_speed  \
0   34.0522  -118.2437  Region A  Online             Operations     7.73222   
1   36.7783  -119.4179  Region B  Online             Operations    19.67376   
2   40.7128   -74.0060  Region C  Online             Operations    23.32255 

In [18]:
# Convertir le DataFrame Pandas en Spark DataFrame
df_spark_new = spark.createDataFrame(df_pandas_new, schema=df_spark.schema)

# Ajouter les nouvelles donn√©es √† la table Bronze (mode append)
df_spark_new.write.format('delta').mode("append").save(bronze_table_path)

print("‚úÖ Donn√©es ajout√©es avec succ√®s dans le Lakehouse Bronze")
print(f"üìä Total apr√®s ajout : {spark.read.format('delta').load(bronze_table_path).count()} lignes")

StatementMeta(, 2d461eb1-0999-4cce-ad35-540b4018191c, 20, Finished, Available, Finished)

‚úÖ Donn√©es ajout√©es avec succ√®s dans le Lakehouse Bronze
üìä Total apr√®s ajout : 1296 lignes
