## PARTIE 2 : SPARK

In [3]:
def _build_spark_session(
    app_name, driver_cores, driver_mem, max_executors, executor_cores,
    executor_mem, queue
):
    """Build Spark session."""
    return (
        SparkSession.builder
        .appName(app_name)
        .config("spark.master", "yarn")
        .config("spark.submit.deployMode", "client")
        .config("spark.driver.cores", driver_cores)
        .config("spark.driver.memory", driver_mem)
        .config("spark.executor.cores", executor_cores)
        .config("spark.executor.memory", executor_mem)
        .config("spark.shuffle.service.enabled", True)
        .config("spark.dynamicAllocation.enabled", True)
        .config("spark.dynamicAllocation.minExecutors", 0)
        .config("spark.dynamicAllocation.maxExecutors", max_executors)
        .config("spark.executor.memoryOverhead", 2048)
        .config("spark.driver.memoryOverhead", 1024)
        .config("spark.yarn.queue", queue)
        # .config("spark.sql.session.timeZone", "UTC")
        .config("spark.driver.extraClassPath", "/soft/ora1210/db/jdbc/lib/ojdbc6.jar")
        .config("spark.executor.extraClassPath", "/soft/ora1210/db/jdbc/lib/ojdbc6.jar")
        .getOrCreate()
    )


In [4]:
from pyspark.sql import SparkSession

spark_session = SparkSession.builder\
        .appName("app_name") \
        .getOrCreate()

In [5]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window

data_spark = spark_session.read.csv("/home/jovyan/code/dataset_sample_spark.csv",
                                     header=True, 
                                     inferSchema=True,
                                     sep=';')

In [6]:
# Suppresiion des doublons 
data_spark = data_spark.dropDuplicates()

In [7]:
data_spark.show()

+-----------+------------+---------+-----------+----------+-------------+--------------+----------------------------+
|     sensor|servicePoint|   client|transmitter|      date|meterDiameter|volume_l_value|index_interpolated_d_l_value|
+-----------+------------+---------+-----------+----------+-------------+--------------+----------------------------+
|C04AE134021| 9,84588E+11|LYONNAISE|   C01E00D9|14/11/2020|           40|           5.0|                   6672885.0|
|C07AA092853| 9,89432E+11|LYONNAISE|11A000060C8|22/01/2019|           15|           0.0|                   2924425.0|
|C07AA092853| 9,89432E+11|LYONNAISE|11A000060C8|04/01/2020|           15|         269.0|                   3363479.0|
|C07AA092853| 9,89432E+11|LYONNAISE|11A000060C8|27/02/2020|           15|           0.0|                   3367554.0|
|C08EB001382| 9,85226E+11|LYONNAISE|   C24F74E5|02/04/2019|           20|         304.0|                    479962.0|
|C08EB001382| 9,85226E+11|LYONNAISE|   C24F74E5|23/04/20

In [8]:
# Convertir la colonne 'date' de type string en DateType
data_spark = data_spark.withColumn("date", F.to_date("date", "dd/MM/yyyy"))

In [9]:
data_spark.printSchema()

root
 |-- sensor: string (nullable = true)
 |-- servicePoint: string (nullable = true)
 |-- client: string (nullable = true)
 |-- transmitter: string (nullable = true)
 |-- date: date (nullable = true)
 |-- meterDiameter: integer (nullable = true)
 |-- volume_l_value: double (nullable = true)
 |-- index_interpolated_d_l_value: double (nullable = true)



In [10]:
# Vérification d'unicité de la triplette (sensor, servicePoint, transmitter) pour une date donnée
window_spec = Window.partitionBy("sensor", "servicePoint", "transmitter")
counts = data_spark.withColumn("count", F.lit(1)).groupBy("sensor", "servicePoint", "transmitter").count()
non_unique = counts.filter(F.col("count") > 1)
if non_unique.count() > 0:
    print("Attention : Des doublons existent pour certaines triplettes.")
    data_spark = data_spark.dropDuplicates(["sensor", "servicePoint", "transmitter", "date"])

Attention : Des doublons existent pour certaines triplettes.


In [12]:
data_spark.printSchema()

root
 |-- sensor: string (nullable = true)
 |-- servicePoint: string (nullable = true)
 |-- client: string (nullable = true)
 |-- transmitter: string (nullable = true)
 |-- date: date (nullable = true)
 |-- meterDiameter: integer (nullable = true)
 |-- volume_l_value: double (nullable = true)
 |-- index_interpolated_d_l_value: double (nullable = true)



In [14]:
max_date = data_spark.select(F.max("date").alias("max_date")).collect()[0]["max_date"]
max_date

datetime.date(2021, 8, 25)

In [15]:
data_spark = data_spark.withColumn(
    "is_significant", 
    F.when(F.col("volume_l_value") > 20, F.lit(1)).otherwise(F.lit(0))
)

In [18]:
# Définir une fenêtre pour partitionner par triplette et ordonner par date
window_spec = Window.partitionBy("sensor", "servicePoint", "transmitter").orderBy(F.desc("date"))

In [19]:
# Trouver la dernière date avec une consommation significative
data_spark = data_spark.withColumn(
    "last_significant_date",
    F.when(F.col("is_significant") == 1, F.col("date"))
).withColumn(
    "last_significant_date",
    F.last("last_significant_date", ignorenulls=True).over(window_spec)
)

In [21]:
data_spark.show(100)

+-----------+------------+---------+-----------+----------+-------------+-----------------+----------------------------+--------------+---------------------+
|     sensor|servicePoint|   client|transmitter|      date|meterDiameter|   volume_l_value|index_interpolated_d_l_value|is_significant|last_significant_date|
+-----------+------------+---------+-----------+----------+-------------+-----------------+----------------------------+--------------+---------------------+
|C04AE134021| 9,84588E+11|LYONNAISE|   C01E00D9|2021-08-08|           40|              0.0|                   7070442.0|             0|                 NULL|
|C04AE134021| 9,84588E+11|LYONNAISE|   C01E00D9|2021-08-07|           40|              0.0|                   7070442.0|             0|                 NULL|
|C04AE134021| 9,84588E+11|LYONNAISE|   C01E00D9|2021-08-06|           40|              0.0|                   7070442.0|             0|                 NULL|
|C04AE134021| 9,84588E+11|LYONNAISE|   C01E00D9|2021

In [22]:
# Calculer v1 comme la différence entre la date maximale (T) et la dernière date significative
data_spark = data_spark.withColumn(
    "v1",
    F.datediff(F.lit(max_date), F.col("last_significant_date"))
)


In [30]:
# Résultat
data_spark.select("sensor", "servicePoint", "transmitter", "date", "v1").show()

+-----------+------------+-----------+----------+----+
|     sensor|servicePoint|transmitter|      date|  v1|
+-----------+------------+-----------+----------+----+
|C04AE134021| 9,84588E+11|   C01E00D9|2021-08-08|NULL|
|C04AE134021| 9,84588E+11|   C01E00D9|2021-08-07|NULL|
|C04AE134021| 9,84588E+11|   C01E00D9|2021-08-06|NULL|
|C04AE134021| 9,84588E+11|   C01E00D9|2021-08-05|NULL|
|C04AE134021| 9,84588E+11|   C01E00D9|2021-08-04|NULL|
|C04AE134021| 9,84588E+11|   C01E00D9|2021-08-03|NULL|
|C04AE134021| 9,84588E+11|   C01E00D9|2021-08-02|NULL|
|C04AE134021| 9,84588E+11|   C01E00D9|2021-08-01|NULL|
|C04AE134021| 9,84588E+11|   C01E00D9|2021-07-31|NULL|
|C04AE134021| 9,84588E+11|   C01E00D9|2021-07-30|NULL|
|C04AE134021| 9,84588E+11|   C01E00D9|2021-07-29|NULL|
|C04AE134021| 9,84588E+11|   C01E00D9|2021-07-28|NULL|
|C04AE134021| 9,84588E+11|   C01E00D9|2021-07-27|NULL|
|C04AE134021| 9,84588E+11|   C01E00D9|2021-07-26|NULL|
|C04AE134021| 9,84588E+11|   C01E00D9|2021-07-25|NULL|
|C04AE1340

In [25]:
# Afficher les valeurs uniques de v1
unique_v1 = data_spark.select("v1").distinct().orderBy("v1")
unique_v1.show(truncate=False)

+----+
|v1  |
+----+
|NULL|
|31  |
|42  |
|47  |
|48  |
|49  |
|50  |
|51  |
|52  |
|53  |
|54  |
|55  |
|56  |
|57  |
|58  |
|59  |
|60  |
|61  |
|62  |
|63  |
+----+
only showing top 20 rows



In [27]:
# Afficher la dernière date avec une consommation significative
data_spark.select("last_significant_date").distinct().orderBy("last_significant_date").show(truncate=False)

+---------------------+
|last_significant_date|
+---------------------+
|NULL                 |
|2019-01-01           |
|2019-01-02           |
|2019-01-03           |
|2019-01-04           |
|2019-01-05           |
|2019-01-06           |
|2019-01-07           |
|2019-01-08           |
|2019-01-09           |
|2019-01-10           |
|2019-01-11           |
|2019-01-12           |
|2019-01-13           |
|2019-01-14           |
|2019-01-15           |
|2019-01-16           |
|2019-01-17           |
|2019-01-18           |
|2019-01-19           |
+---------------------+
only showing top 20 rows



In [34]:
data_spark = data_spark.withColumn(
    "T0", F.last("last_significant_date", ignorenulls=True).over(window_spec)
)


In [40]:
# Filtrer les 90 jours avant T0
data_spark = data_spark.withColumn(
    "within_90_days",
    F.when((F.col("date") <= F.col("T0")) & (F.col("date") > F.date_sub(F.col("T0"), 90)), 1).otherwise(0)
)

# Calculer le nombre de jours avec consommation non nulle dans les 90 jours avant T0
data_spark = data_spark.withColumn(
    "non_null_days",
    F.when((F.col("within_90_days") == 1) & (F.col("volume_l_value") > 0), 1).otherwise(0)
)
data_spark = data_spark.withColumn(
    "total_non_null_days", F.sum("non_null_days").over(window_spec)
)

In [41]:
# Calculer v2
data_spark = data_spark.withColumn(
    "v2", F.col("total_non_null_days") / 90
)

In [45]:
# Résultat
data_spark.select("v2").distinct().show()

+-------------------+
|                 v2|
+-------------------+
| 7.2444444444444445|
|  7.533333333333333|
|  7.711111111111111|
|  7.866666666666666|
|  9.688888888888888|
|0.18888888888888888|
| 1.2666666666666666|
| 1.7444444444444445|
| 2.5444444444444443|
|  4.877777777777778|
| 1.5222222222222221|
|                2.4|
|  9.811111111111112|
| 1.9666666666666666|
| 2.1555555555555554|
|  4.055555555555555|
|  5.044444444444444|
|                8.0|
| 1.5888888888888888|
|                0.0|
+-------------------+
only showing top 20 rows



In [51]:
# Définir la période de 6 mois (183 jours) avant T0 - 90 jours
data_spark = data_spark.withColumn(
    "within_183_days",
    F.when(
        (F.col("date") <= F.date_sub(F.col("T0"), 90)) & 
        (F.col("date") > F.date_sub(F.col("T0"), 273)), 
        1
    ).otherwise(0)
)

# Calculer le nombre de jours avec consommation non nulle dans la période de 6 mois
data_spark = data_spark.withColumn(
    "non_null_days_v3",
    F.when((F.col("within_183_days") == 1) & (F.col("volume_l_value") > 0), 1).otherwise(0)
)

data_spark = data_spark.withColumn(
    "total_non_null_days_v3", 
    F.sum("non_null_days_v3").over(Window.partitionBy("sensor", "servicePoint", "transmitter"))
)

# Calculer v3
data_spark = data_spark.withColumn(
    "v3", F.col("total_non_null_days_v3") / 183
)



In [54]:
# Résultat
data_spark.select("sensor", "servicePoint", "transmitter", "T0", "v3").show(truncate=False)

+-----------+------------+-----------+----+---+
|sensor     |servicePoint|transmitter|T0  |v3 |
+-----------+------------+-----------+----+---+
|C04AE134021|9,84588E+11 |C01E00D9   |NULL|0.0|
|C04AE134021|9,84588E+11 |C01E00D9   |NULL|0.0|
|C04AE134021|9,84588E+11 |C01E00D9   |NULL|0.0|
|C04AE134021|9,84588E+11 |C01E00D9   |NULL|0.0|
|C04AE134021|9,84588E+11 |C01E00D9   |NULL|0.0|
|C04AE134021|9,84588E+11 |C01E00D9   |NULL|0.0|
|C04AE134021|9,84588E+11 |C01E00D9   |NULL|0.0|
|C04AE134021|9,84588E+11 |C01E00D9   |NULL|0.0|
|C04AE134021|9,84588E+11 |C01E00D9   |NULL|0.0|
|C04AE134021|9,84588E+11 |C01E00D9   |NULL|0.0|
|C04AE134021|9,84588E+11 |C01E00D9   |NULL|0.0|
|C04AE134021|9,84588E+11 |C01E00D9   |NULL|0.0|
|C04AE134021|9,84588E+11 |C01E00D9   |NULL|0.0|
|C04AE134021|9,84588E+11 |C01E00D9   |NULL|0.0|
|C04AE134021|9,84588E+11 |C01E00D9   |NULL|0.0|
|C04AE134021|9,84588E+11 |C01E00D9   |NULL|0.0|
|C04AE134021|9,84588E+11 |C01E00D9   |NULL|0.0|
|C04AE134021|9,84588E+11 |C01E00D9   |NU

In [56]:
# Filtrer les données sur une période d'un an avant T0
data_spark = data_spark.withColumn(
    "within_1_year",
    F.when((F.col("date") <= F.col("T0")) & (F.col("date") > F.date_sub(F.col("T0"), 365)), 1).otherwise(0)
)

# Marquer les jours avec consommation nulle
data_spark = data_spark.withColumn(
    "is_null", F.when((F.col("within_1_year") == 1) & (F.col("volume_l_value") == 0), 1).otherwise(0)
)

# Identifier les débuts et fins de périodes de consommation nulle
window_spec = Window.partitionBy("sensor", "servicePoint", "transmitter").orderBy("date")
data_spark = data_spark.withColumn(
    "prev_is_null", F.lag("is_null").over(window_spec)
)
data_spark = data_spark.withColumn(
    "new_null_period",
    F.when((F.col("is_null") == 1) & ((F.col("prev_is_null").isNull()) | (F.col("prev_is_null") == 0)), 1).otherwise(0)
)

# Attribuer un identifiant unique à chaque période nulle
data_spark = data_spark.withColumn(
    "null_period_id",
    F.sum("new_null_period").over(window_spec)
)

# Calculer la longueur de chaque période nulle
null_period_lengths = data_spark.filter(F.col("is_null") == 1).groupBy(
    "sensor", "servicePoint", "transmitter", "null_period_id"
).agg(F.count("date").alias("period_length"))

# Calculer la durée moyenne des périodes nulles
avg_null_period_length = null_period_lengths.groupBy(
    "sensor", "servicePoint", "transmitter"
).agg(F.avg("period_length").alias("v4"))

# Joindre la colonne v4 au dataset principal
data_spark = data_spark.join(avg_null_period_length, ["sensor", "servicePoint", "transmitter"], "left")


In [57]:
# Résultat
data_spark.select("sensor", "servicePoint", "transmitter", "v4").show(truncate=False)


+-----------+------------+-----------+------------------+
|sensor     |servicePoint|transmitter|v4                |
+-----------+------------+-----------+------------------+
|C04AE134021|9,84588E+11 |C01E00D9   |1.864864864864865 |
|C04AE134021|9,84588E+11 |C01E00D9   |1.864864864864865 |
|C04AE134021|9,84588E+11 |C01E00D9   |1.864864864864865 |
|C04AE134021|9,84588E+11 |C01E00D9   |1.864864864864865 |
|C04AE134021|9,84588E+11 |C01E00D9   |1.864864864864865 |
|C07AA092853|9,89432E+11 |11A000060C8|8.0               |
|C07AA092853|9,89432E+11 |11A000060C8|8.0               |
|C07AA092853|9,89432E+11 |11A000060C8|8.0               |
|C07AA092853|9,89432E+11 |11A000060C8|8.0               |
|C07AA092853|9,89432E+11 |11A000060C8|8.0               |
|C08EB001382|9,85226E+11 |C24F74E5   |9.090909090909092 |
|C08EB001382|9,85226E+11 |C24F74E5   |9.090909090909092 |
|C08EB001382|9,85226E+11 |C24F74E5   |9.090909090909092 |
|C08EB001382|9,85226E+11 |C24F74E5   |9.090909090909092 |
|C08FA041975|9

In [72]:
# Identifier les jours de consommation nulle
data_spark = data_spark.withColumn(
    "is_null", F.when((F.col("within_1_year") == 1) & (F.col("volume_l_value") == 0), 1).otherwise(0)
)

# Identifier les débuts de périodes nulles
data_spark = data_spark.withColumn(
    "prev_is_null", F.lag("is_null").over(Window.partitionBy("sensor", "servicePoint", "transmitter").orderBy("date"))
)
data_spark = data_spark.withColumn(
    "new_null_period",
    F.when((F.col("is_null") == 1) & ((F.col("prev_is_null").isNull()) | (F.col("prev_is_null") == 0)), 1).otherwise(0)
)

# Identifier chaque période de consommation nulle avec un ID unique
data_spark = data_spark.withColumn(
    "null_period_id",
    F.sum("new_null_period").over(Window.partitionBy("sensor", "servicePoint", "transmitter").orderBy("date"))
)

# Calculer la longueur de chaque période nulle
null_period_lengths = data_spark.filter(F.col("is_null") == 1).groupBy(
    "sensor", "servicePoint", "transmitter", "null_period_id"
).agg(F.count("date").alias("period_length"))

# Trouver la durée maximale des périodes nulles (v5)
max_null_period_length = null_period_lengths.groupBy(
    "sensor", "servicePoint", "transmitter"
).agg(F.max("period_length").alias("v5"))

# Joindre la colonne v5 au dataset principal
data_spark = data_spark.join(max_null_period_length, ["sensor", "servicePoint", "transmitter"], "left")


In [73]:
# Résultat
data_spark.select("sensor", "servicePoint", "transmitter", "v5").show(truncate=False)

+-----------+------------+-----------+---+
|sensor     |servicePoint|transmitter|v5 |
+-----------+------------+-----------+---+
|C04AE134021|9,84588E+11 |C01E00D9   |6  |
|C04AE134021|9,84588E+11 |C01E00D9   |6  |
|C04AE134021|9,84588E+11 |C01E00D9   |6  |
|C04AE134021|9,84588E+11 |C01E00D9   |6  |
|C04AE134021|9,84588E+11 |C01E00D9   |6  |
|C07AA092853|9,89432E+11 |11A000060C8|48 |
|C07AA092853|9,89432E+11 |11A000060C8|48 |
|C07AA092853|9,89432E+11 |11A000060C8|48 |
|C07AA092853|9,89432E+11 |11A000060C8|48 |
|C07AA092853|9,89432E+11 |11A000060C8|48 |
|C08EB001382|9,85226E+11 |C24F74E5   |50 |
|C08EB001382|9,85226E+11 |C24F74E5   |50 |
|C08EB001382|9,85226E+11 |C24F74E5   |50 |
|C08EB001382|9,85226E+11 |C24F74E5   |50 |
|C08FA041975|9,80564E+11 |C2412C78   |113|
|C08FA041975|9,80564E+11 |C2412C78   |113|
|C08FA041975|9,80564E+11 |C2412C78   |113|
|C08FA041975|9,80564E+11 |C2412C78   |113|
|C08FA041975|9,80564E+11 |C2412C78   |113|
|C08FA085851|9,84967E+11 |C2ABDA49   |12 |
+----------

In [75]:
# Identifier les jours de consommation non nulle
data_spark = data_spark.withColumn(
    "is_non_null", F.when((F.col("within_1_year") == 1) & (F.col("volume_l_value") > 0), 1).otherwise(0)
)

# Identifier les débuts de périodes non nulles
data_spark = data_spark.withColumn(
    "prev_is_non_null", F.lag("is_non_null").over(Window.partitionBy("sensor", "servicePoint", "transmitter").orderBy("date"))
)
data_spark = data_spark.withColumn(
    "new_non_null_period",
    F.when((F.col("is_non_null") == 1) & ((F.col("prev_is_non_null").isNull()) | (F.col("prev_is_non_null") == 0)), 1).otherwise(0)
)

# Identifier chaque période de consommation non nulle avec un ID unique
data_spark = data_spark.withColumn(
    "non_null_period_id",
    F.sum("new_non_null_period").over(Window.partitionBy("sensor", "servicePoint", "transmitter").orderBy("date"))
)

# Calculer la longueur de chaque période non nulle
non_null_period_lengths = data_spark.filter(F.col("is_non_null") == 1).groupBy(
    "sensor", "servicePoint", "transmitter", "non_null_period_id"
).agg(F.count("date").alias("non_null_period_length"))

# Trouver la durée maximale des périodes non nulles (v6)
max_non_null_period_length = non_null_period_lengths.groupBy(
    "sensor", "servicePoint", "transmitter"
).agg(F.max("non_null_period_length").alias("v6"))

# Joindre la colonne v6 au dataset principal
data_spark = data_spark.join(max_non_null_period_length, ["sensor", "servicePoint", "transmitter"], "left")


In [76]:
# Résultat
data_spark.select("sensor", "servicePoint", "transmitter", "v6").show(truncate=False)

+-----------+------------+-----------+---+
|sensor     |servicePoint|transmitter|v6 |
+-----------+------------+-----------+---+
|C04AE134021|9,84588E+11 |C01E00D9   |250|
|C04AE134021|9,84588E+11 |C01E00D9   |250|
|C04AE134021|9,84588E+11 |C01E00D9   |250|
|C04AE134021|9,84588E+11 |C01E00D9   |250|
|C04AE134021|9,84588E+11 |C01E00D9   |250|
|C07AA092853|9,89432E+11 |11A000060C8|270|
|C07AA092853|9,89432E+11 |11A000060C8|270|
|C07AA092853|9,89432E+11 |11A000060C8|270|
|C07AA092853|9,89432E+11 |11A000060C8|270|
|C07AA092853|9,89432E+11 |11A000060C8|270|
|C08EB001382|9,85226E+11 |C24F74E5   |91 |
|C08EB001382|9,85226E+11 |C24F74E5   |91 |
|C08EB001382|9,85226E+11 |C24F74E5   |91 |
|C08EB001382|9,85226E+11 |C24F74E5   |91 |
|C08FA041975|9,80564E+11 |C2412C78   |323|
|C08FA041975|9,80564E+11 |C2412C78   |323|
|C08FA041975|9,80564E+11 |C2412C78   |323|
|C08FA041975|9,80564E+11 |C2412C78   |323|
|C08FA041975|9,80564E+11 |C2412C78   |323|
|C08FA085851|9,84967E+11 |C2ABDA49   |301|
+----------

In [77]:
# Identifier l'index mécanique à la dernière consommation significative (T0)
window_spec = Window.partitionBy("sensor", "servicePoint", "transmitter").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# Filtrer pour récupérer la valeur de l'index à T0
data_spark = data_spark.withColumn(
    "v7",
    F.when(F.col("date") == F.col("T0"), F.col("index_interpolated_d_l_value")).otherwise(None)
)

# Propager l'index mécanique (v7) pour chaque triplette
data_spark = data_spark.withColumn(
    "v7", F.last("v7", ignorenulls=True).over(window_spec)
)

# Résultat
data_spark.select("sensor", "servicePoint", "transmitter", "T0", "v7").show(truncate=False)


+-----------+------------+-----------+----------+---------+
|sensor     |servicePoint|transmitter|T0        |v7       |
+-----------+------------+-----------+----------+---------+
|C04AE134021|9,84588E+11 |C01E00D9   |2019-01-02|7070442.0|
|C04AE134021|9,84588E+11 |C01E00D9   |2019-01-02|7070442.0|
|C04AE134021|9,84588E+11 |C01E00D9   |2019-01-06|7070442.0|
|C04AE134021|9,84588E+11 |C01E00D9   |2019-01-06|7070442.0|
|C04AE134021|9,84588E+11 |C01E00D9   |2019-01-06|7070442.0|
|C04AE134021|9,84588E+11 |C01E00D9   |2019-01-06|7070442.0|
|C04AE134021|9,84588E+11 |C01E00D9   |2019-01-07|7070442.0|
|C04AE134021|9,84588E+11 |C01E00D9   |2019-01-08|7070442.0|
|C04AE134021|9,84588E+11 |C01E00D9   |2019-01-09|7070442.0|
|C04AE134021|9,84588E+11 |C01E00D9   |2019-01-10|7070442.0|
|C04AE134021|9,84588E+11 |C01E00D9   |2019-01-13|7070442.0|
|C04AE134021|9,84588E+11 |C01E00D9   |2019-01-13|7070442.0|
|C04AE134021|9,84588E+11 |C01E00D9   |2019-01-13|7070442.0|
|C04AE134021|9,84588E+11 |C01E00D9   |20

In [80]:

# Identifier les jours de consommation non nulle dans l'année précédant T0
data_spark = data_spark.withColumn(
    "is_non_null", F.when((F.col("within_1_year") == 1) & (F.col("volume_l_value") > 0), 1).otherwise(0)
)

# Identifier les débuts de périodes non nulles
window_spec = Window.partitionBy("sensor", "servicePoint", "transmitter").orderBy("date")
data_spark = data_spark.withColumn(
    "prev_is_non_null", F.lag("is_non_null").over(window_spec)
)
data_spark = data_spark.withColumn(
    "new_non_null_period",
    F.when((F.col("is_non_null") == 1) & ((F.col("prev_is_non_null").isNull()) | (F.col("prev_is_non_null") == 0)), 1).otherwise(0)
)

# Calculer le nombre total de périodes non nulles (v8)
data_spark = data_spark.withColumn(
    "v8", F.sum("new_non_null_period").over(Window.partitionBy("sensor", "servicePoint", "transmitter"))
)

# Résultat
data_spark.select("sensor", "servicePoint", "transmitter", "v8").show(truncate=False)


+-----------+------------+-----------+---+
|sensor     |servicePoint|transmitter|v8 |
+-----------+------------+-----------+---+
|C04AE134021|9,84588E+11 |C01E00D9   |37 |
|C04AE134021|9,84588E+11 |C01E00D9   |37 |
|C04AE134021|9,84588E+11 |C01E00D9   |37 |
|C04AE134021|9,84588E+11 |C01E00D9   |37 |
|C04AE134021|9,84588E+11 |C01E00D9   |37 |
|C04AE134021|9,84588E+11 |C01E00D9   |37 |
|C04AE134021|9,84588E+11 |C01E00D9   |37 |
|C04AE134021|9,84588E+11 |C01E00D9   |37 |
|C04AE134021|9,84588E+11 |C01E00D9   |37 |
|C04AE134021|9,84588E+11 |C01E00D9   |37 |
|C04AE134021|9,84588E+11 |C01E00D9   |37 |
|C04AE134021|9,84588E+11 |C01E00D9   |37 |
|C04AE134021|9,84588E+11 |C01E00D9   |37 |
|C04AE134021|9,84588E+11 |C01E00D9   |37 |
|C04AE134021|9,84588E+11 |C01E00D9   |37 |
|C04AE134021|9,84588E+11 |C01E00D9   |37 |
|C04AE134021|9,84588E+11 |C01E00D9   |37 |
|C04AE134021|9,84588E+11 |C01E00D9   |37 |
|C04AE134021|9,84588E+11 |C01E00D9   |37 |
|C04AE134021|9,84588E+11 |C01E00D9   |37 |
+----------

In [83]:

# Extraire les 5 premiers caractères du champ sensor pour obtenir le millesime
data_spark = data_spark.withColumn("millesime", F.substring(F.col("sensor"), 1, 5))

# Appliquer les règles pour calculer v9
data_spark = data_spark.withColumn(
    "v9",
    F.when(
        (F.col("millesime").isin("C10FA", "C10LA", "C10SA", "C11FA", "C11LA", "C11SA")) & (F.col("index_interpolated_d_l_value") > 3),
        0
    ).when(
        F.col("millesime").isin("D16BU", "Z12ER", "C07AA"),
        1
    ).otherwise(2)
)

# Résultat
data_spark.select("sensor", "millesime", "index_interpolated_d_l_value", "v9").show(truncate=False)


+-----------+---------+----------------------------+---+
|sensor     |millesime|index_interpolated_d_l_value|v9 |
+-----------+---------+----------------------------+---+
|C04AE134021|C04AE    |5652487.0                   |2  |
|C04AE134021|C04AE    |6672885.0                   |2  |
|C04AE134021|C04AE    |6749679.5                   |2  |
|C04AE134021|C04AE    |6949835.0                   |2  |
|C04AE134021|C04AE    |6967624.0                   |2  |
|C07AA092853|C07AA    |3252572.0                   |1  |
|C07AA092853|C07AA    |3312573.0                   |1  |
|C07AA092853|C07AA    |3367554.0                   |1  |
|C07AA092853|C07AA    |3526363.0                   |1  |
|C07AA092853|C07AA    |3526385.5                   |1  |
|C08EB001382|C08EB    |487564.0                    |2  |
|C08EB001382|C08EB    |487564.0                    |2  |
|C08EB001382|C08EB    |496577.0                    |2  |
|C08EB001382|C08EB    |526975.0                    |2  |
|C08FA041975|C08FA    |551811.0