## Lab1

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, avg, count

In [22]:
spark = SparkSession.builder.appName("Projet-Final").getOrCreate()
print('Spark session created:', spark)

Spark session created: <pyspark.sql.session.SparkSession object at 0x000001CE36D3FD50>


In [23]:
df = spark.read.csv("./data/equipment_anomaly_data.csv", header=True, inferSchema=True)
df.show(5)

+-------------------+----------+-----------------+-----------------+------------------+------------------+----------+-------------+------+
|          timestamp|      date|      temperature|         pressure|         vibration|          humidity| equipment|     location|faulty|
+-------------------+----------+-----------------+-----------------+------------------+------------------+----------+-------------+------+
|2025-01-01 08:00:00|2025-01-01|58.18018003931781|25.02927765103301|0.6065162172245139|45.694907104076414|   Turbine|      Atlanta|   0.0|
|2025-01-01 08:00:10|2025-01-01|75.74071220894001|22.95401759548667| 2.338094753751008| 41.86740679261492|Compressor|      Chicago|   0.0|
|2025-01-01 08:00:20|2025-01-01|71.35859424081657|27.27683031893662|1.3891983049086754| 58.95440890948324|   Turbine|San Francisco|   0.0|
|2025-01-01 08:00:30|2025-01-01|71.61698526704753|32.24292130393475|1.7706896863176191| 40.56513821185597|      Pump|      Atlanta|   0.0|
|2025-01-01 08:00:40|2025-0

In [24]:
df.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- date: date (nullable = true)
 |-- temperature: double (nullable = true)
 |-- pressure: double (nullable = true)
 |-- vibration: double (nullable = true)
 |-- humidity: double (nullable = true)
 |-- equipment: string (nullable = true)
 |-- location: string (nullable = true)
 |-- faulty: double (nullable = true)



In [25]:
df_clean = df.dropna(subset=["temperature", "pressure", "vibration", "humidity", "equipment"])
df_clean = df_clean.fillna({"faulty": 0.0})


In [26]:
df_enriched = df_clean.withColumn(
    "health_category",
    when((col("vibration") < 50) & (col("temperature") < 80) & (col("pressure") < 200), "normal")
    .when((col("vibration") >= 50) & (col("vibration") < 70), "warning")
    .when((col("temperature") >= 80) | (col("pressure") >= 200) | (col("vibration") >= 70), "critical")
    .otherwise("unknown")
)
df_enriched.show(5)

+-------------------+----------+-----------------+-----------------+------------------+------------------+----------+-------------+------+---------------+
|          timestamp|      date|      temperature|         pressure|         vibration|          humidity| equipment|     location|faulty|health_category|
+-------------------+----------+-----------------+-----------------+------------------+------------------+----------+-------------+------+---------------+
|2025-01-01 08:00:00|2025-01-01|58.18018003931781|25.02927765103301|0.6065162172245139|45.694907104076414|   Turbine|      Atlanta|   0.0|         normal|
|2025-01-01 08:00:10|2025-01-01|75.74071220894001|22.95401759548667| 2.338094753751008| 41.86740679261492|Compressor|      Chicago|   0.0|         normal|
|2025-01-01 08:00:20|2025-01-01|71.35859424081657|27.27683031893662|1.3891983049086754| 58.95440890948324|   Turbine|San Francisco|   0.0|         normal|
|2025-01-01 08:00:30|2025-01-01|71.61698526704753|32.24292130393475|1.

In [27]:
df_agg = df_enriched.groupBy("equipment", "health_category") \
    .agg(
        avg("vibration").alias("avg_vibration"),
        avg("temperature").alias("avg_temperature"),
        avg("pressure").alias("avg_pressure"),
        avg("humidity").alias("avg_humidity"),
        count("*").alias("record_count")
    )
df_agg.show(5)

+----------+---------------+------------------+-----------------+------------------+------------------+------------+
| equipment|health_category|     avg_vibration|  avg_temperature|      avg_pressure|      avg_humidity|record_count|
+----------+---------------+------------------+-----------------+------------------+------------------+------------+
|   Turbine|       critical|  1.80083531490391|93.90281591073115| 37.76805313731594| 49.15736323471513|         452|
|Compressor|       critical|1.7558697650204935| 93.2707744438854| 36.55402631824557|50.503897248017026|         526|
|      Pump|         normal|1.5677290655300709|65.53427429076656|35.184459849628766|50.089306426076014|        2042|
|   Turbine|         normal|1.5746258441308443|65.67025309635089|35.445976244629556| 49.78657371620923|        2113|
|      Pump|       critical|1.8025541631446096|  91.800079163731|37.474128614717706|50.645782038507676|         492|
+----------+---------------+------------------+-----------------

In [28]:
df_agg.write.mode("overwrite").parquet("output_projet_final/equipment_health_agg.parquet")


In [29]:
df_result = spark.read.parquet("output_projet_final/equipment_health_agg.parquet")
df_result.show()

+----------+---------------+------------------+-----------------+------------------+------------------+------------+
| equipment|health_category|     avg_vibration|  avg_temperature|      avg_pressure|      avg_humidity|record_count|
+----------+---------------+------------------+-----------------+------------------+------------------+------------+
|   Turbine|       critical|  1.80083531490391|93.90281591073115| 37.76805313731594| 49.15736323471513|         452|
|Compressor|       critical|1.7558697650204935| 93.2707744438854| 36.55402631824557|50.503897248017026|         526|
|      Pump|         normal|1.5677290655300709|65.53427429076656|35.184459849628766|50.089306426076014|        2042|
|   Turbine|         normal|1.5746258441308443|65.67025309635089|35.445976244629556| 49.78657371620923|        2113|
|      Pump|       critical|1.8025541631446096|  91.800079163731|37.474128614717706|50.645782038507676|         492|
|Compressor|         normal|1.5695609646421145|65.88416239585283

## Lab2

In [46]:
from pyspark.sql.functions import hour, col

df_grouped = df.groupBy(hour(col("timestamp")).alias("hour"), col("equipment")).count()
df_grouped.show()


+----+----------+-----+
|hour| equipment|count|
+----+----------+-----+
|  16|Compressor|  122|
|   8|      Pump|  106|
|  16|   Turbine|  119|
|  14|      Pump|  116|
|  20|      Pump|  131|
|   8|Compressor|  121|
|  13|   Turbine|  115|
|  16|      Pump|  119|
|   4|Compressor|  126|
|  22|   Turbine|  123|
|   3|   Turbine|  116|
|  17|      Pump|  117|
|  21|   Turbine|  115|
|   3|Compressor|  124|
|  23|Compressor|  128|
|  12|      Pump|  122|
|   1|   Turbine|  124|
|   2|   Turbine|  102|
|  15|      Pump|  115|
|  22|Compressor|  135|
+----+----------+-----+
only showing top 20 rows


In [47]:
# Charger le CSV des métadonnées des équipements
metadata_df = spark.read.csv("data/equipment_metadata.csv", header=True, inferSchema=True)
metadata_df.show(5)

+------------+--------------+---------+------------+------------+----------------+
|equipment_id|equipment_type| location|manufacturer|install_date|last_maintenance|
+------------+--------------+---------+------------+------------+----------------+
|      EQ-001|          Pump|Marseille|   Schneider|  2018-03-14|      2024-01-10|
|      EQ-002|       Turbine|     Lyon|          GE|  2019-07-22|      2024-03-15|
|      EQ-003|    Compressor|    Paris|     Siemens|  2020-11-05|      2024-04-18|
|      EQ-004|        Boiler| Toulouse|         EDF|  2021-09-09|      2024-05-02|
|      EQ-005|         Valve|    Lille|         ABB|  2018-12-17|      2024-02-12|
+------------+--------------+---------+------------+------------+----------------+
only showing top 5 rows


In [49]:
df_enriched = df_grouped.join(metadata_df, df_grouped["equipment"] == metadata_df["equipment_type"], "left")
df_enriched.show(5)


+----+----------+-----+------------+--------------+---------+------------+------------+----------------+
|hour| equipment|count|equipment_id|equipment_type| location|manufacturer|install_date|last_maintenance|
+----+----------+-----+------------+--------------+---------+------------+------------+----------------+
|  16|Compressor|  122|      EQ-013|    Compressor|    Nancy|     Siemens|  2020-10-29|      2024-05-07|
|  16|Compressor|  122|      EQ-003|    Compressor|    Paris|     Siemens|  2020-11-05|      2024-04-18|
|   8|      Pump|  106|      EQ-001|          Pump|Marseille|   Schneider|  2018-03-14|      2024-01-10|
|  16|   Turbine|  119|      EQ-012|       Turbine|    Paris|          GE|  2019-08-01|      2024-03-27|
|  16|   Turbine|  119|      EQ-002|       Turbine|     Lyon|          GE|  2019-07-22|      2024-03-15|
+----+----------+-----+------------+--------------+---------+------------+------------+----------------+
only showing top 5 rows


In [50]:
df_enriched.write \
    .mode("overwrite") \
    .partitionBy("equipment") \
    .parquet("output_projet_final/output_parquet_by_equipment")


## Lab 3

In [52]:
df_enriched.explain(True)

== Parsed Logical Plan ==
Join LeftOuter, (equipment#272 = equipment_type#1030)
:- Aggregate [hour(timestamp#567, Some(Europe/Paris)), equipment#272], [hour(timestamp#567, Some(Europe/Paris)) AS hour#984, equipment#272, count(1) AS count#985L]
:  +- Project [to_timestamp(timestamp#266, None, TimestampType, Some(Europe/Paris), true) AS timestamp#567, date#526, temperature#268, pressure#269, vibration#270, humidity#271, equipment#272, location#273, faulty#274, hour#527]
:     +- Project [timestamp#266, date#526, temperature#268, pressure#269, vibration#270, humidity#271, equipment#272, location#273, faulty#274, hour(timestamp#266, Some(Europe/Paris)) AS hour#527]
:        +- Project [timestamp#266, date_format(timestamp#266, yyyy-MM-dd, Some(Europe/Paris)) AS date#526, temperature#268, pressure#269, vibration#270, humidity#271, equipment#272, location#273, faulty#274, hour#525]
:           +- Project [timestamp#266, date#524, temperature#268, pressure#269, vibration#270, humidity#271, eq

In [53]:
df_repart = df_enriched.repartition("equipment")
df_repart.show(5)

+----+---------+-----+------------+--------------+---------+------------+------------+----------------+
|hour|equipment|count|equipment_id|equipment_type| location|manufacturer|install_date|last_maintenance|
+----+---------+-----+------------+--------------+---------+------------+------------+----------------+
|   8|     Pump|  106|      EQ-001|          Pump|Marseille|   Schneider|  2018-03-14|      2024-01-10|
|  14|     Pump|  116|      EQ-001|          Pump|Marseille|   Schneider|  2018-03-14|      2024-01-10|
|  20|     Pump|  131|      EQ-001|          Pump|Marseille|   Schneider|  2018-03-14|      2024-01-10|
|  16|     Pump|  119|      EQ-001|          Pump|Marseille|   Schneider|  2018-03-14|      2024-01-10|
|  17|     Pump|  117|      EQ-001|          Pump|Marseille|   Schneider|  2018-03-14|      2024-01-10|
+----+---------+-----+------------+--------------+---------+------------+------------+----------------+
only showing top 5 rows


In [54]:
from pyspark.sql.functions import broadcast

df_final = df_grouped.join(broadcast(metadata_df), df_grouped["equipment"] == metadata_df["equipment_type"], how="left")
df_final.show(5)

+----+----------+-----+------------+--------------+---------+------------+------------+----------------+
|hour| equipment|count|equipment_id|equipment_type| location|manufacturer|install_date|last_maintenance|
+----+----------+-----+------------+--------------+---------+------------+------------+----------------+
|  16|Compressor|  122|      EQ-013|    Compressor|    Nancy|     Siemens|  2020-10-29|      2024-05-07|
|  16|Compressor|  122|      EQ-003|    Compressor|    Paris|     Siemens|  2020-11-05|      2024-04-18|
|   8|      Pump|  106|      EQ-001|          Pump|Marseille|   Schneider|  2018-03-14|      2024-01-10|
|  16|   Turbine|  119|      EQ-012|       Turbine|    Paris|          GE|  2019-08-01|      2024-03-27|
|  16|   Turbine|  119|      EQ-002|       Turbine|     Lyon|          GE|  2019-07-22|      2024-03-15|
+----+----------+-----+------------+--------------+---------+------------+------------+----------------+
only showing top 5 rows


In [55]:
df_enriched.cache()

DataFrame[hour: int, equipment: string, count: bigint, equipment_id: string, equipment_type: string, location: string, manufacturer: string, install_date: date, last_maintenance: date]

In [56]:
spark.sparkContext.setCheckpointDir("output_projet_final/checkpoints")
df_enriched.checkpoint(eager=True)


DataFrame[hour: int, equipment: string, count: bigint, equipment_id: string, equipment_type: string, location: string, manufacturer: string, install_date: date, last_maintenance: date]

In [57]:
import time
start = time.time()
df_enriched.count()
end = time.time()
print("Avant optimisation :", end-start, "secondes")


Avant optimisation : 3.208066463470459 secondes


In [58]:
start = time.time()
df_final.count()
end = time.time()
print("Après optimisation :", end-start, "secondes")


Après optimisation : 1.4035847187042236 secondes
