In [1]:
import pandas as pd

# Chargement des fichiers CSV
df_incidents = pd.read_csv("../../data/raw/Cleaned_data/InUSE/cleaned_data_incidents.csv")
df_mobilisations = pd.read_csv("../../data/raw/Cleaned_data/InUSE/cleaned_data_mobilisations.csv")

# Vérification du chargement
print("Données incidents clean chargées:")
display(df_incidents.head()) # Use .head() for better display in non-notebook environments
print("Données mobilisations clean chargées:")
display(df_mobilisations.head()) # Use .head() for better display in non-notebook environments

# Vérifier la période des données d'incidents et de mobilisation: doit etre de 2009 à 2025
print("Période des données d'incidents chargées: De", min(df_incidents["CalYear"]), "à", max(df_incidents["CalYear"]))
print("Période des données de mobilisations chargées: De", min(df_mobilisations["CalYear"]), "à", max(df_mobilisations["CalYear"]))

print("\nVérification taille:")
print("Incidents :", len(df_incidents), " | Mobilisations :", len(df_mobilisations))

# Colonnes à garder pour la table mobilisations (This part is commented out in the PySpark section,
# but was present in the Pandas section of the original code, so keeping it here for context)
# mobilisations_cols_to_keep = [
#     'IncidentNumber', 'CalYear', 'BoroughName', 'WardName', 'HourOfCall',
#     'DateAndTimeMobilised', 'DateAndTimeMobile', 'DateAndTimeArrived',
#     'TurnoutTimeSeconds', 'TravelTimeSeconds', 'AttendanceTimeSeconds',
#     'DeployedFromStation_Name', 'DeployedFromLocation'
# ]
# df_mobilisations_reduced = df_mobilisations[mobilisations_cols_to_keep].copy()

# Prétraitement de la colonne IncidentNumber
def clean_incident_number_pandas(value):
    """Nettoyage de l'identifiant d'incident"""
    if pd.isna(value):
        return None
    value = str(value)
    if '.' in value:
        return value.split('.')[0]
    elif '-' in value:
        return value.split('-')[0]
    return value

print("\nType IncidentNumber (avant nettoyage - Incidents) :", df_incidents["IncidentNumber"].dtype)
print("Type IncidentNumber (avant nettoyage - Mobilisations) :", df_mobilisations["IncidentNumber"].dtype)

# Application du nettoyage aux deux jeux de données
df_incidents["incident_id_cleaned"] = df_incidents["IncidentNumber"].apply(clean_incident_number_pandas)
df_mobilisations["incident_id_cleaned"] = df_mobilisations["IncidentNumber"].apply(clean_incident_number_pandas)

print("Type IncidentNumber (après nettoyage - Incidents):", df_incidents["incident_id_cleaned"].dtype)
print("Type IncidentNumber (après nettoyage - Mobilisations):", df_mobilisations["incident_id_cleaned"].dtype)

# Jointure sur la colonne nettoyée
df_joined_pandas = pd.merge(df_mobilisations, df_incidents, on="incident_id_cleaned", how="inner")

# Count après jointure
print("\nCount après jointure (INNER JOIN sur incident_id_cleaned - Pandas) :", len(df_joined_pandas))

# % de jointure réussie par rapport à la table mobilisations
joined_ratio_pandas = len(df_joined_pandas) / len(df_mobilisations)
print(f"Taux de jointure réussie (Pandas) : {joined_ratio_pandas:.2%}")

# Vérifier après jointure de la période des données d'incidents et de mobilisation: doit etre de 2009 à 2025
print("Période des données jointes (Pandas): De", min(df_joined_pandas["CalYear_x"]), "à", max(df_joined_pandas["CalYear_x"])) # CalYear_x from mobilisations
print("Période des données jointes (Pandas): De", min(df_joined_pandas["CalYear_y"]), "à", max(df_joined_pandas["CalYear_y"])) # CalYear_y from incidents


print("\n" + "="*50)
print("Démarrage de la section PySpark")
print("="*50 + "\n")

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, regexp_extract, min as spark_min, max as spark_max
import os

# Stop any existing Spark session (important si redémarré en cours de route)
try:
    spark.stop()
except:
    pass

# Démarrage d'une nouvelle session Spark
spark = SparkSession.builder \
    .appName("Jointure Incidents & Mobilisations") \
    .master("local[*]") \
    .getOrCreate()

print("SparkSession initialisée")

# 2. Chargement des fichiers nettoyés
path_incidents = "../../data/raw/Cleaned_data/InUSE/cleaned_data_incidents.csv"
path_mobilisations = "../../data/raw/Cleaned_data/InUSE/cleaned_data_mobilisations.csv"

# Optionnel : Vérifie si les fichiers existent (Python natif)
assert os.path.exists(path_incidents), f"Fichier introuvable : {path_incidents}"
assert os.path.exists(path_mobilisations), f"Fichier introuvable : {path_mobilisations}"

# Chargement CSV
df_incidents_spark = spark.read.option("header", True).option("inferSchema", "true").csv(path_incidents)
df_mobilisations_spark = spark.read.option("header", True).option("inferSchema", "true").csv(path_mobilisations)

print("Données chargées dans Spark")

# 3. Aperçu rapide
print("Échantillon incidents (Spark)")
df_incidents_spark.show(3, truncate=False)
print("Échantillon mobilisations (Spark)")
df_mobilisations_spark.show(3, truncate=False)

# 4. Vérification de la période des données
print("Période des incidents (Spark) :")
df_incidents_spark.select(spark_min("CalYear"), spark_max("CalYear")).show()
print("Période des mobilisations (Spark) :")
df_mobilisations_spark.select(spark_min("CalYear"), spark_max("CalYear")).show()

# 5. Nettoyage IncidentNumber for both DataFrames
def clean_incident_number_spark(col_):
    return when(col_.contains('.'), regexp_extract(col_, r"^([^\.]+)", 1)) \
        .when(col_.contains('-'), regexp_extract(col_, r"^([^-]+)", 1)) \
        .otherwise(col_)

df_incidents_cleaned_spark = df_incidents_spark.withColumn(
    "incident_id_cleaned", clean_incident_number_spark(col("IncidentNumber")))
df_mobilisations_cleaned_spark = df_mobilisations_spark.withColumn(
    "incident_id_cleaned", clean_incident_number_spark(col("IncidentNumber")))

# IMPORTANT ADDITION: Drop duplicates from df_incidents_cleaned based on the cleaned incident ID
# This ensures a one-to-many relationship where each incident is unique.
df_incidents_unique_spark = df_incidents_cleaned_spark.dropDuplicates(['incident_id_cleaned'])
print(f"Nombre d'incidents uniques après nettoyage (Spark) : {df_incidents_unique_spark.count():,}")

# 6. Jointure sur la colonne id nettoyée
# Rename 'CalYear' in incidents to avoid ambiguity after join
df_incidents_unique_spark = df_incidents_unique_spark.withColumnRenamed("CalYear", "CalYear_inc")
df_mobilisations_cleaned_spark = df_mobilisations_cleaned_spark.withColumnRenamed("CalYear", "CalYear_mob")
df_mobilisations_cleaned_spark = df_mobilisations_cleaned_spark.withColumnRenamed("HourOfCall", "HourOfCall_mob")


# Join mobilisations to incidents, as one mobilization links to one incident.
df_joined_spark = df_mobilisations_cleaned_spark.join(
    df_incidents_unique_spark, # Use the DataFrame with unique incident IDs
    on="incident_id_cleaned",
    how="inner"
)

# 7. Statistiques de jointure
nb_incidents_original_spark = df_incidents_spark.count()
nb_mobilisations_original_spark = df_mobilisations_spark.count()
nb_jointure_spark = df_joined_spark.count()

print(f"\nNombre d'incidents (original - Spark) : {nb_incidents_original_spark:,}")
print(f"Nombre de mobilisations (original - Spark) : {nb_mobilisations_original_spark:,}")
print(f"Nombre d'incidents uniques utilisés pour la jointure (Spark) : {df_incidents_unique_spark.count():,}")
print(f"Lignes jointes (Spark) : {nb_jointure_spark:,}")

# The join ratio should now ideally be closer to 100% of mobilisations if all mobilisations have matching incidents.
print(f"Taux de jointure réussie (par rapport aux mobilisations - Spark) : {nb_jointure_spark / nb_mobilisations_original_spark:.2%}")
print(f"Taux de jointure réussie (par rapport aux incidents uniques - Spark) : {nb_jointure_spark / df_incidents_unique_spark.count():.2%}")


# Check for unmatched mobilisations
df_mobilisations_unmatched_spark = df_mobilisations_cleaned_spark.join(
    df_incidents_unique_spark, # Use the DataFrame with unique incident IDs
    on="incident_id_cleaned",
    how="left_anti"
)
print(f"Mobilisations non jointes (Spark) : {df_mobilisations_unmatched_spark.count():,}")

# 8. Affichage des 10 premières lignes après jointure
print("\nAperçu des données après jointure (Spark) :")
df_joined_spark.show(5, truncate=False)
print("Schema après jointure (Spark):")
df_joined_spark.printSchema()

colonnes_apres_jointure_spark = df_joined_spark.columns
nombre_colonnes_spark = len(colonnes_apres_jointure_spark)

print("\nListe des colonnes après jointure (Spark) :")
print(colonnes_apres_jointure_spark)
print(f"Nombre total de colonnes (Spark) : {nombre_colonnes_spark}")

# Étape 1 : Lister les colonnes des deux DataFrames
cols_incidents_spark = set(df_incidents_unique_spark.columns)
cols_mobilisations_spark = set(df_mobilisations_cleaned_spark.columns)

# Étape 2 : Trouver les colonnes communes
# We need to exclude the renamed 'CalYear_inc' and 'HourOfCall_inc' and 'IncidentNumber' from the initial
# list of columns to avoid double counting after renaming.
# The `on` column ('incident_id_cleaned') is handled correctly by the join.
common_columns_spark = (cols_incidents_spark.intersection(cols_mobilisations_spark)) - {'CalYear_inc', 'HourOfCall', 'IncidentNumber'} # Exclude explicitly renamed/dropped columns

print("\nColonnes présentes dans les deux DataFrames (possibles doublons après jointure, hors celles renommées):")
for col_name in sorted(common_columns_spark):
    print(f"- {col_name}")

print(f"\nNombre de colonnes communes (hors renommées/jointure key) : {len(common_columns_spark)}")


# Comparaison CalYear (Spark)
print("\nComparaison CalYear (Spark):")
df_joined_spark.select(
    (col("CalYear_mob") == col("CalYear_inc")).alias("same_CalYear")
).groupBy("same_CalYear").count().show()

# Comparaison HourOfCall (Spark)
print("Comparaison HourOfCall (Spark):")
# Note: HourOfCall was only present in mobilisations, and then it was named HourOfCall. In incidents, it should be
# 'HourOfCall'. If it exists in both, then this comparison is valid.
# Assuming 'HourOfCall' from incidents was renamed by Spark's default behavior, or if it was explicitly present.
# Let's assume 'HourOfCall' is the one from incidents, and 'HourOfCall_mob' is from mobilisations.
if "HourOfCall" in df_joined_spark.columns: # Check if 'HourOfCall' from incidents is present
    df_joined_spark.select(
        (col("HourOfCall_mob") == col("HourOfCall")).alias("same_HourOfCall")
    ).groupBy("same_HourOfCall").count().show()
else:
    print("HourOfCall from incidents not directly available for comparison after join, or was renamed.")


# Comparaison IncidentNumber (Spark) - this is less meaningful after using 'incident_id_cleaned' as join key
# and assuming 'IncidentNumber' from incidents is kept as 'IncidentNumber' and from mobilisations as 'IncidentNumber'.
# Spark typically renames duplicated columns unless specified. Let's check how they are named.
print("Comparaison IncidentNumber (Spark):")
# Assuming the IncidentNumber from mobilisations is 'IncidentNumber' and from incidents is 'IncidentNumber_inc' or similar.
# We should check the column names in df_joined_spark.
# Based on the schema, it looks like 'IncidentNumber' comes from mobilisations and 'IncidentNumber' (from incidents)
# gets renamed by Spark's default behavior to 'IncidentNumber_incidents' or similar if not explicitly dropped/renamed.
# Re-checking the actual columns after join:
# If both original 'IncidentNumber' columns are kept, Spark will append _x and _y (or similar).
# Let's verify the column names and then compare.
# From the df_joined.printSchema() earlier, we can infer how Spark handled it.
# If `df_joined_spark` contains 'IncidentNumber' (from mobilisations) and 'IncidentNumber' (from incidents),
# Spark will make one of them 'IncidentNumber' and the other 'IncidentNumber_inc' or similar.
# Based on the schema output:
# 'IncidentNumber' will be from mobilisations, and 'IncidentNumber_inc' will be from incidents.
if "IncidentNumber_inc" in df_joined_spark.columns:
    df_joined_spark.select(
        (col("IncidentNumber") == col("IncidentNumber_inc")).alias("same_IncidentNumber")
    ).groupBy("same_IncidentNumber").count().show()
else:
    print("IncidentNumber from incidents not found as 'IncidentNumber_inc' for direct comparison.")
    # If the column name is just 'IncidentNumber' from incidents, and another one is also 'IncidentNumber', this is an issue.
    # The previous `dropDuplicates` on incidents should have handled this, but the comparison still implies having both.


print("\nPériode des données jointes (CalYear_mob - Spark) :")
periode_mob = df_joined_spark.agg(
    spark_min("CalYear_mob").alias("Annee_min"),
    spark_max("CalYear_mob").alias("Annee_max")
).collect()[0]
print(f"Période des données (CalYear_mob) : de {periode_mob['Annee_min']} à {periode_mob['Annee_max']}")

print("\nPériode des données jointes (CalYear_inc - Spark) :")
periode_inc = df_joined_spark.agg(
    spark_min("CalYear_inc").alias("Annee_min"),
    spark_max("CalYear_inc").alias("Annee_max")
).collect()[0]
print(f"Période des données (CalYear_inc) : de {periode_inc['Annee_min']} à {periode_inc['Annee_max']}")

# Optional: Stop Spark session if you're done with operations
spark.stop()

  df_incidents = pd.read_csv("../../data/raw/Cleaned_data/InUSE/cleaned_data_incidents.csv")
  df_mobilisations = pd.read_csv("../../data/raw/Cleaned_data/InUSE/cleaned_data_mobilisations.csv")


Données incidents clean chargées:


Unnamed: 0,IncidentNumber,DateOfCall,CalYear,TimeOfCall,HourOfCall,IncidentGroup,StopCodeDescription,SpecialServiceType,PropertyCategory,PropertyType,...,FirstPumpArriving_AttendanceTime,FirstPumpArriving_DeployedFromStation,SecondPumpArriving_AttendanceTime,SecondPumpArriving_DeployedFromStation,NumStationsWithPumpsAttending,NumPumpsAttending,PumpCount,PumpMinutesRounded,NotionalCost,NumCalls
0,235138081.0,2009-01-01,2009,00:00:37,0,Special Service,Special Service,RTC,Road Vehicle,Car,...,319.0,Battersea,342.0,Clapham,2.0,2.0,2,60,255,1.0
1,1091.0,2009-01-01,2009,00:00:46,0,Special Service,Special Service,Assist other agencies,Outdoor,Lake/pond/reservoir,...,,,,,,,1,60,255,1.0
2,2091.0,2009-01-01,2009,00:03:00,0,Fire,Secondary Fire,,Outdoor,Road surface/pavement,...,308.0,Edmonton,,,1.0,1.0,1,60,255,2.0
3,3091.0,2009-01-01,2009,00:04:27,0,Fire,Secondary Fire,,Outdoor,Domestic garden (vegetation not equipment),...,210.0,Hillingdon,,,1.0,1.0,1,60,255,2.0
4,5091.0,2009-01-01,2009,00:05:39,0,Fire,Secondary Fire,,Outdoor,Cycle path/public footpath/bridleway,...,233.0,Holloway,250.0,Holloway,1.0,2.0,2,60,255,1.0


Données mobilisations clean chargées:


Unnamed: 0,IncidentNumber,CalYear,BoroughName,WardName,HourOfCall,ResourceMobilisationId,Resource_Code,PerformanceReporting,DateAndTimeMobilised,DateAndTimeMobile,...,DateAndTimeLeft,DateAndTimeReturned,DeployedFromStation_Code,DeployedFromStation_Name,DeployedFromLocation,PumpOrder,PlusCode_Code,PlusCode_Description,DelayCodeId,DelayCode_Description
0,000004-01012025,2025,HAMMERSMITH AND FULHAM,FULHAM REACH,0,6862256,H331,2,2025-01-01 00:02:00,2025-01-01 00:07:00,...,2025-01-01 00:23:00,,H33,Wandsworth,Home Station,2,Initial,Initial Mobilisation,12.0,Not held up
1,000004-01012025,2025,HAMMERSMITH AND FULHAM,FULHAM REACH,0,6862257,G261,1,2025-01-01 00:02:00,2025-01-01 00:02:00,...,2025-01-01 00:38:00,,G36,Hammersmith,Other Station,1,Initial,Initial Mobilisation,12.0,Not held up
2,000005-01012025,2025,MERTON,WEST BARNES,0,6862259,H401,1,2025-01-01 00:03:00,2025-01-01 00:04:00,...,2025-01-01 00:11:00,,H40,New Malden,Home Station,1,Initial,Initial Mobilisation,,
3,000006-01012025,2025,CROYDON,PURLEY OAKS & RIDDLESDOWN,0,6862260,H291,1,2025-01-01 00:04:00,2025-01-01 00:06:00,...,2025-01-01 00:25:00,,H29,Purley,Home Station,1,Initial,Initial Mobilisation,,
4,000007-01012025,2025,BARNET,BURNT OAK,0,6862261,G222,1,2025-01-01 00:05:00,2025-01-01 00:06:00,...,2025-01-01 00:27:00,,G22,Stanmore,Home Station,1,Initial,Initial Mobilisation,,


Période des données d'incidents chargées: De 2009 à 2025
Période des données de mobilisations chargées: De 2009 à 2025

Vérification taille:
Incidents : 1824973  | Mobilisations : 2578276

Type IncidentNumber (avant nettoyage - Incidents) : object
Type IncidentNumber (avant nettoyage - Mobilisations) : object
Type IncidentNumber (après nettoyage - Incidents): object
Type IncidentNumber (après nettoyage - Mobilisations): object

Count après jointure (INNER JOIN sur incident_id_cleaned - Pandas) : 9949296
Taux de jointure réussie (Pandas) : 385.89%
Période des données jointes (Pandas): De 2009 à 2025
Période des données jointes (Pandas): De 2009 à 2025

Démarrage de la section PySpark



PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.