In [1]:
!pip install pyspark



In [7]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# 1. Créer une session Spark
spark = SparkSession.builder \
    .appName("Pipeline PySpark Test") \
    .master("local[*]") \
    .getOrCreate()

# Lire inverter_yields.csv
df_inverter_yields = spark.read.csv("inverter_yields.csv", header=True, inferSchema=True)

# Lire site_median_reference.csv
df_site_median = spark.read.csv("site_median_reference.csv", header=True, inferSchema=True)

# Lire sldc_events.csv
df_sldc_events = spark.read.csv("sldc_events.csv", header=True, inferSchema=True)

# Lire static_inverter_info.csv
df_static_inverter = spark.read.csv("static_inverter_info.csv", header=True, inferSchema=True)

#df_inverter_yields.show(5)
#df_site_median.show(5)
#df_sldc_events.show(5)
#df_static_inverter.show(5)



# Question 2

In [10]:
# Joints inverter_yields avec :static_inverter_info via logical_device_mrid
df_joined = df_inverter_yields.join(
    df_static_inverter,
    on="logical_device_mrid"  # colonne commune
    #how="inner"                # inner join
)
df_joined.show(5)

+-------------------+-------------------+-------------------+------------+-----------------+-------------------+------------+----------------------+---------------------+------------+
|logical_device_mrid|           ts_start|             ts_end|project_code|specific_yield_ac|reference_yield_stc|project_mrid|inverter_function_type|storage_inverter_type|ac_max_power|
+-------------------+-------------------+-------------------+------------+-----------------+-------------------+------------+----------------------+---------------------+------------+
|             INV_A1|2025-06-01 00:00:00|2025-06-01 00:10:00|      SITE_A|            1.188|              1.095|       PRJ_A|                    PV|           DC-Coupled|       15000|
|             INV_A2|2025-06-01 00:00:00|2025-06-01 00:10:00|      SITE_A|            1.014|              1.337|       PRJ_A|                    PV|                 NULL|       16000|
|             INV_B1|2025-06-01 00:00:00|2025-06-01 00:10:00|      SITE_B|      

In [14]:
# Joints inverter_yields avec : sldc_events en chevauchement temporel
from pyspark.sql.functions import col
from pyspark.sql.types import TimestampType

# Convertir les colonnes en timestamp
df_inverter_yields = df_inverter_yields.withColumn("ts_start", col("ts_start").cast(TimestampType()))
df_inverter_yields = df_inverter_yields.withColumn("ts_end", col("ts_end").cast(TimestampType()))

df_sldc_events = df_sldc_events.withColumn("ts_start", col("ts_start").cast(TimestampType()))
df_sldc_events = df_sldc_events.withColumn("ts_end", col("ts_end").cast(TimestampType()))

df_time_joined = df_inverter_yields.join(
    df_sldc_events,
    (df_inverter_yields.ts_start <= df_sldc_events.ts_end) &
    (df_sldc_events.ts_start <= df_inverter_yields.ts_end)
    #how="inner"
)
df_time_joined.show(5)
df_time_joined.printSchema()



+-------------------+-------------------+-------------------+------------+-----------------+-------------------+------------+-------------------+-------------------+-------------------+--------------------+
|           ts_start|             ts_end|logical_device_mrid|project_code|specific_yield_ac|reference_yield_stc|project_mrid|logical_device_mrid|           ts_start|             ts_end|iec63019_category_id|
+-------------------+-------------------+-------------------+------------+-----------------+-------------------+------------+-------------------+-------------------+-------------------+--------------------+
|2025-06-01 00:00:00|2025-06-01 00:10:00|             INV_A1|      SITE_A|            1.188|              1.095|       PRJ_A|             INV_A1|2025-06-01 00:00:00|2025-06-01 00:20:00|     FULL_CAPABILITY|
|2025-06-01 00:00:00|2025-06-01 00:10:00|             INV_A1|      SITE_A|            1.188|              1.095|       PRJ_A|             INV_A2|2025-06-01 00:00:00|2025-06

In [17]:
#Joints inverter_yields avec :site_median_reference sur project_code et ts_start

from pyspark.sql.functions import col
from pyspark.sql.types import TimestampType

# Conversion ts_start pour les deux DataFrames
df_inverter_yields = df_inverter_yields.withColumn("ts_start", col("ts_start").cast(TimestampType()))
df_site_median = df_site_median.withColumn("ts_start", col("ts_start").cast(TimestampType()))

df_median_joined = df_inverter_yields.join(
    df_site_median,
    on=["project_code", "ts_start"],  # jointure sur deux colonnes
    how="inner"                       # inner join
)

# verify result
df_median_joined.show(5)
df_median_joined.printSchema()



+------------+-------------------+-------------------+-------------------+-----------------+-------------------+------------+--------------------------+
|project_code|           ts_start|             ts_end|logical_device_mrid|specific_yield_ac|reference_yield_stc|project_mrid|site_median_specific_yield|
+------------+-------------------+-------------------+-------------------+-----------------+-------------------+------------+--------------------------+
|      SITE_A|2025-06-01 00:00:00|2025-06-01 00:10:00|             INV_A1|            1.188|              1.095|       PRJ_A|                       1.1|
|      SITE_A|2025-06-01 00:00:00|2025-06-01 00:10:00|             INV_A2|            1.014|              1.337|       PRJ_A|                       1.1|
|      SITE_B|2025-06-01 00:00:00|2025-06-01 00:10:00|             INV_B1|            0.843|              1.077|       PRJ_B|                       1.2|
+------------+-------------------+-------------------+-------------------+--------

In [19]:
# Calcule potential_production = specific_yield_ac × ac_max_power × 1/6 (10min en heures)

from pyspark.sql.functions import col

df_joined = df_joined.withColumn(
    "potential_production",
    col("specific_yield_ac") * col("ac_max_power") * (1/6)
)
#verify result
df_joined.select("logical_device_mrid", "specific_yield_ac", "ac_max_power", "potential_production").show(5)



+-------------------+-----------------+------------+--------------------+
|logical_device_mrid|specific_yield_ac|ac_max_power|potential_production|
+-------------------+-----------------+------------+--------------------+
|             INV_A1|            1.188|       15000|              2970.0|
|             INV_A2|            1.014|       16000|              2704.0|
|             INV_B1|            0.843|       14000|              1967.0|
|             INV_A1|            1.088|       15000|              2720.0|
|             INV_A2|             1.03|       16000|  2746.6666666666665|
+-------------------+-----------------+------------+--------------------+
only showing top 5 rows



In [20]:
# Ne conserve que les inverters "PV" qui ne sont pas "AC-Coupled"

from pyspark.sql.functions import col

df_filtered = df_joined.filter(
    (col("inverter_function_type") == "PV") &
    (col("storage_inverter_type") != "AC-Coupled")
)
df_filtered.select("logical_device_mrid", "inverter_function_type", "storage_inverter_type").show(5)


+-------------------+----------------------+---------------------+
|logical_device_mrid|inverter_function_type|storage_inverter_type|
+-------------------+----------------------+---------------------+
|             INV_A1|                    PV|           DC-Coupled|
|             INV_A1|                    PV|           DC-Coupled|
|             INV_A1|                    PV|           DC-Coupled|
|             INV_A1|                    PV|           DC-Coupled|
|             INV_A1|                    PV|           DC-Coupled|
+-------------------+----------------------+---------------------+
only showing top 5 rows



In [22]:
# Produit un fichier parquet partitionné par project_code et year_month
from pyspark.sql.functions import date_format

# Créer la colonne year_month à partir de ts_start
from pyspark.sql.functions import date_format, col

df_final = df_filtered.withColumn("year_month", date_format(col("ts_start"), "yyyy-MM"))

# Écrire le fichier Parquet partitionné à la racine de Colab
df_final.write.mode("overwrite") \
    .partitionBy("project_code", "year_month") \
    .parquet("/content/parquet_data")


In [23]:
!ls /content/parquet_data

'project_code=SITE_A'   _SUCCESS


In [25]:
# Test spark
from pyspark.sql.functions import col

def run_tests(df):
    """
    Exécute des tests de validation sur le DataFrame final du pipeline PySpark.
    """
    # Vérifier que le DataFrame n'est pas vide
    assert df.count() > 0, "Le DataFrame final est vide !"

    # Vérifier la présence des colonnes importantes
    expected_columns = [
        "logical_device_mrid",
        "ts_start",
        "ts_end",
        "specific_yield_ac",
        "ac_max_power",
        "potential_production",
        "inverter_function_type",
        "storage_inverter_type"
    ]
    for col_name in expected_columns:
        assert col_name in df.columns, f"La colonne {col_name} est manquante dans le DataFrame final"

    # Vérifier que potential_production > 0
    zero_potential = df.filter(col("potential_production") <= 0).count()
    assert zero_potential == 0, f"Il y a {zero_potential} lignes avec potential_production <= 0"

    # Vérifier le filtrage PV non AC-Coupled
    non_pv = df.filter(col("inverter_function_type") != "PV").count()
    ac_coupled = df.filter(col("storage_inverter_type") == "AC-Coupled").count()
    assert non_pv == 0, f"Il y a {non_pv} inverters qui ne sont pas PV"
    assert ac_coupled == 0, f"Il y a {ac_coupled} inverters AC-Coupled"

    # Vérifier la présence de partitions project_code et year_month
    if "project_code" in df.columns and "year_month" in df.columns:
        partitions = df.select("project_code", "year_month").distinct().count()
        assert partitions > 0, "Aucune partition project_code/year_month détectée"

    print("Tous les tests Spark ont été passés avec succès !")


In [26]:
run_tests(df_final)

Tous les tests Spark ont été passés avec succès !


In [37]:
# Pipeline pyspark final, complet et portable
!pip install pyspark

# --------------------------
# Importer PySpark et fonctions
# --------------------------
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# --------------------------
# Paramètres
# --------------------------
CSV_FILES = {
    "inverter_yields": "inverter_yields.csv",
    "static_inverter_info": "static_inverter_info.csv",
    "sldc_events": "sldc_events.csv",
    "site_median_reference": "site_median_reference.csv"
}

OUTPUT_PATH = "/content/parquet_data"

# --------------------------
# Initialiser Spark
# --------------------------
spark = SparkSession.builder \
    .appName("InverterPipeline") \
    .master("local[*]") \
    .getOrCreate()

# --------------------------
# Lire les CSV
# --------------------------
df_inverter_yields = spark.read.csv(CSV_FILES["inverter_yields"], header=True, inferSchema=True)
df_static_inverter = spark.read.csv(CSV_FILES["static_inverter_info"], header=True, inferSchema=True)
df_sldc_events = spark.read.csv(CSV_FILES["sldc_events"], header=True, inferSchema=True)
df_site_median = spark.read.csv(CSV_FILES["site_median_reference"], header=True, inferSchema=True)

# --------------------------
# Convertir colonnes timestamp
# --------------------------
df_inverter_yields = df_inverter_yields.withColumn("ts_start_inv", F.col("ts_start").cast("timestamp")) \
                                       .withColumn("ts_end_inv", F.col("ts_end").cast("timestamp"))

df_sldc_events = df_sldc_events.withColumn("ts_start_event", F.col("ts_start").cast("timestamp")) \
                               .withColumn("ts_end_event", F.col("ts_end").cast("timestamp"))

df_site_median = df_site_median.withColumn("ts_start_median", F.col("ts_start").cast("timestamp"))

# --------------------------
# Renommer colonnes static_inverter
# --------------------------
df_static_inverter = df_static_inverter.withColumnRenamed("logical_device_mrid", "logical_device_mrid_static")

# --------------------------
# Jointures avec alias pour éviter ambiguïté
# --------------------------

# 7.1 inverter_yields + static_inverter_info
left = df_inverter_yields.alias("inv")
right = df_static_inverter.alias("stat")

df_joined = left.join(
    right,
    F.col("inv.logical_device_mrid") == F.col("stat.logical_device_mrid_static"),
    "inner"
).select(
    F.col("inv.logical_device_mrid").alias("logical_device_mrid"),
    F.col("inv.ts_start_inv"),
    F.col("inv.ts_end_inv"),
    F.col("inv.specific_yield_ac"),
    F.col("inv.project_code"),
    F.col("stat.ac_max_power"),
    F.col("stat.inverter_function_type"),
    F.col("stat.storage_inverter_type")
)

# 7.2 Jointure temporelle avec sldc_events
left = df_joined.alias("left")
right = df_sldc_events.alias("right")

df_joined2 = left.join(
    right,
    (F.col("left.ts_start_inv") <= F.col("right.ts_end_event")) &
    (F.col("right.ts_start_event") <= F.col("left.ts_end_inv")),
    "inner"
).select(
    F.col("left.logical_device_mrid"),
    F.col("left.ts_start_inv"),
    F.col("left.ts_end_inv"),
    F.col("left.specific_yield_ac"),
    F.col("left.project_code"),
    F.col("left.ac_max_power"),
    F.col("left.inverter_function_type"),
    F.col("left.storage_inverter_type")
)

# 7.3 Jointure avec site_median_reference sur project_code et ts_start
left = df_joined2.alias("left")
right = df_site_median.alias("right")

df_joined3 = left.join(
    right,
    (F.col("left.project_code") == F.col("right.project_code")) &
    (F.col("left.ts_start_inv") == F.col("right.ts_start_median")),
    "inner"
).select(
    F.col("left.logical_device_mrid"),
    F.col("left.ts_start_inv"),
    F.col("left.ts_end_inv"),
    F.col("left.specific_yield_ac"),
    F.col("left.project_code"),
    F.col("left.ac_max_power"),
    F.col("left.inverter_function_type"),
    F.col("left.storage_inverter_type")
)

# --------------------------
# Calcul potential_production
# --------------------------
df_joined3 = df_joined3.withColumn(
    "potential_production",
    F.col("specific_yield_ac") * F.col("ac_max_power") * (1/6)
)

# --------------------------
# Filtrer PV non AC-Coupled
# --------------------------
df_filtered = df_joined3.filter(
    (F.col("inverter_function_type") == "PV") &
    (F.col("storage_inverter_type") != "AC-Coupled")
)

# --------------------------
# Créer year_month et sélectionner colonnes finales
# --------------------------
df_final_clean = df_filtered.withColumn(
    "year_month", F.date_format(F.col("ts_start_inv"), "yyyy-MM")
).select(
    F.col("logical_device_mrid"),
    F.col("ts_start_inv").alias("ts_start"),
    F.col("ts_end_inv").alias("ts_end"),
    F.col("specific_yield_ac"),
    F.col("ac_max_power"),
    F.col("potential_production"),
    F.col("inverter_function_type"),
    F.col("storage_inverter_type"),
    F.col("project_code"),
    F.col("year_month")
)

# --------------------------
#  Écrire Parquet partitionné
# --------------------------
df_final_clean.write.mode("overwrite").partitionBy("project_code", "year_month").parquet(OUTPUT_PATH)

# --------------------------
# Fonction de tests Spark
# --------------------------
def run_tests(df):
    assert df.count() > 0, "Le DataFrame final est vide !"
    expected_columns = [
        "logical_device_mrid","ts_start","ts_end","specific_yield_ac",
        "ac_max_power","potential_production","inverter_function_type",
        "storage_inverter_type","project_code","year_month"
    ]
    for col_name in expected_columns:
        assert col_name in df.columns, f"La colonne {col_name} est manquante"
    zero_potential = df.filter(F.col("potential_production") <= 0).count()
    assert zero_potential == 0, f"Il y a {zero_potential} lignes avec potential_production <= 0"
    non_pv = df.filter(F.col("inverter_function_type") != "PV").count()
    ac_coupled = df.filter(F.col("storage_inverter_type") == "AC-Coupled").count()
    assert non_pv == 0, f"Il y a {non_pv} inverters qui ne sont pas PV"
    assert ac_coupled == 0, f"Il y a {ac_coupled} inverters AC-Coupled"
    partitions = df.select("project_code", "year_month").distinct().count()
    assert partitions > 0, "Aucune partition project_code/year_month détectée"
    print("✅ Tous les tests Spark ont été passés avec succès !")

# --------------------------
# Lancer les tests
# --------------------------
run_tests(df_final_clean)


✅ Tous les tests Spark ont été passés avec succès !
