In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [2]:
# Initialiser SparkSession
spark = SparkSession.builder \
    .appName("Data Fusion - Weather and Accidents") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/24 18:55:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Charger les données méteo et accidents
weather_df2 = spark.read.option("multiLine", True).json("/Users/madina/Desktop/data-lake/weather_data2.json")
accidents_df = spark.read.csv("/Users/madina/Desktop/datalake/csv-accident/road clean.csv", header=True, inferSchema=True)


                                                                                

In [4]:
print(weather_df2.count())
print(accidents_df.count())

1944
3059


On affiche les AccidentIndex du CSV qui n'existent pas dans le JSON.

In [6]:
#Vérifier les clés manquantes
missing_keys = accidents_df.join(weather_df2, on="AccidentIndex", how="left_anti")
print("Nombre de clés manquantes :", missing_keys.count())
missing_keys.show()

Nombre de clés manquantes : 1130
+--------------------+-------------+-----------+----------------+---------------+-----------------+---------+----------------+--------------------------+---------+--------------------+------------------+-------------------+-----------------------+---------+-----------+-------------------+-------------------+------------+
|       AccidentIndex|Accident Date|Day_of_Week|Junction_Control|Junction_Detail|Accident_Severity| Latitude|Light_Conditions|Local_Authority_(District)|Longitude|Number_of_Casualties|Number_of_Vehicles|       Police_Force|Road_Surface_Conditions|Road_Type|Speed_limit|               Time|Urban_or_Rural_Area|Vehicle_Type|
+--------------------+-------------+-----------+----------------+---------------+-----------------+---------+----------------+--------------------------+---------+--------------------+------------------+-------------------+-----------------------+---------+-----------+-------------------+-------------------+------------

Comme, on a  cles manquantes
On a utilise la jointure pour conserver toutes les données du CSV, avec une jointure left join 

Probleme:Les colonnes JSON seront remplies avec null pour les accidents sans correspondance.

In [7]:
#Fusion
df_combined1 = accidents_df.join(weather_df2, on="AccidentIndex", how="left")
print("Taille du DataFrame combiné :", df_combined1.count())
df_combined1.show()


Taille du DataFrame combiné : 3059


25/01/24 18:59:25 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------------------+-------------+-----------+----------------+---------------+-----------------+---------+----------------+--------------------------+---------+--------------------+------------------+-------------------+-----------------------+---------+-----------+-------------------+-------------------+------------+----------+------------------+--------------+------------+--------------+--------------+------------------+
|       AccidentIndex|Accident Date|Day_of_Week|Junction_Control|Junction_Detail|Accident_Severity| Latitude|Light_Conditions|Local_Authority_(District)|Longitude|Number_of_Casualties|Number_of_Vehicles|       Police_Force|Road_Surface_Conditions|Road_Type|Speed_limit|               Time|Urban_or_Rural_Area|Vehicle_Type|      Date|    DayLengthHours|MaxTemperature|MaxWindSpeed|MinTemperature|SolarRadiation|TotalPrecipitation|
+--------------------+-------------+-----------+----------------+---------------+-----------------+---------+----------------+--------------

In [None]:
# Sauvegarde du résultat
'''output_path = "/Users/madina/Desktop/datalake/Merged_Accidents_Weather_Data.csv"
df_combined1.coalesce(1).write.option("header", True).csv(output_path)
print(f"Les données fusionnées ont été sauvegardées dans le fichier : {output_path}")'''

[Stage 49:>                                                         (0 + 1) / 1]

Les données fusionnées ont été sauvegardées dans le fichier : /Users/madina/Desktop/datalake/Merged_Accidents_Weather_Data.csv


                                                                                

Pour resoudre le probleme avec cles manquantes, nous anvons ajouté des données par défaut pour les clés manquantes dans le JSON

In [None]:
from pyspark.sql.functions import lit, percentile_approx

# Calculer la médiane des colonnes pertinentes
default_values = weather_df2.select(
    percentile_approx("MaxTemperature", 0.5).alias("MaxTemperature"),
    percentile_approx("MinTemperature", 0.5).alias("MinTemperature"),
    percentile_approx("TotalPrecipitation", 0.5).alias("TotalPrecipitation"),
    percentile_approx("MaxWindSpeed", 0.5).alias("MaxWindSpeed"),
    percentile_approx("SolarRadiation", 0.5).alias("SolarRadiation"),
    percentile_approx("DayLengthHours", 0.5).alias("DayLengthHours")
).collect()[0]


                                                                                

In [13]:
# Créer un DataFrame pour les clés manquantes avec des valeurs remplies (médiane ici)
default_weather = missing_keys.withColumn("Date", lit("Default_Date")) \
    .withColumn("MaxTemperature", lit(default_values["MaxTemperature"])) \
    .withColumn("MinTemperature", lit(default_values["MinTemperature"])) \
    .withColumn("TotalPrecipitation", lit(default_values["TotalPrecipitation"])) \
    .withColumn("MaxWindSpeed", lit(default_values["MaxWindSpeed"])) \
    .withColumn("SolarRadiation", lit(default_values["SolarRadiation"])) \
    .withColumn("DayLengthHours", lit(default_values["DayLengthHours"]))


In [15]:
print(weather_df2.columns)
print(default_weather.columns)


['AccidentIndex', 'Date', 'DayLengthHours', 'MaxTemperature', 'MaxWindSpeed', 'MinTemperature', 'SolarRadiation', 'TotalPrecipitation']
['AccidentIndex', 'Accident Date', 'Day_of_Week', 'Junction_Control', 'Junction_Detail', 'Accident_Severity', 'Latitude', 'Light_Conditions', 'Local_Authority_(District)', 'Longitude', 'Number_of_Casualties', 'Number_of_Vehicles', 'Police_Force', 'Road_Surface_Conditions', 'Road_Type', 'Speed_limit', 'Time', 'Urban_or_Rural_Area', 'Vehicle_Type', 'Date', 'MaxTemperature', 'MinTemperature', 'TotalPrecipitation', 'MaxWindSpeed', 'SolarRadiation', 'DayLengthHours']


In [None]:
# Ajouter des colonnes manquantes à un DataFrame
missing_columns = set(weather_df2.columns) - set(default_weather.columns)
for col in missing_columns:
    default_weather = default_weather.withColumn(col, lit(None))

# Réorganiser les colonnes dans le même ordre
default_weather = default_weather.select(weather_df2.columns)


In [None]:
# Fusionner les lignes JSON existantes avec celles des clés manquantes (avec médiane)
complete_weather_df2 = weather_df2.union(default_weather)

In [18]:
# Effectuer la jointure avec toutes les clés
df_combined = accidents_df.join(complete_weather_df2, on="AccidentIndex", how="inner")

print("Taille finale du DataFrame combiné :", df_combined.count())
df_combined.show()


Taille finale du DataFrame combiné : 3059
+--------------------+-------------+-----------+----------------+---------------+-----------------+---------+----------------+--------------------------+---------+--------------------+------------------+-------------------+-----------------------+---------+-----------+-------------------+-------------------+------------+----------+------------------+--------------+------------+--------------+--------------+------------------+
|       AccidentIndex|Accident Date|Day_of_Week|Junction_Control|Junction_Detail|Accident_Severity| Latitude|Light_Conditions|Local_Authority_(District)|Longitude|Number_of_Casualties|Number_of_Vehicles|       Police_Force|Road_Surface_Conditions|Road_Type|Speed_limit|               Time|Urban_or_Rural_Area|Vehicle_Type|      Date|    DayLengthHours|MaxTemperature|MaxWindSpeed|MinTemperature|SolarRadiation|TotalPrecipitation|
+--------------------+-------------+-----------+----------------+---------------+-----------------

In [None]:
#Sauvegarder les données fusionnées
output_path = "/Users/madina/Desktop/datalake/Merged_Accidents_Weather_Data1.csv"
df_combined.coalesce(1).write.option("header", True).csv(output_path)
print(f"Les données fusionnées ont été sauvegardées dans le fichier : {output_path}")

Les données fusionnées ont été sauvegardées dans le fichier : /Users/madina/Desktop/datalake/Merged_Accidents_Weather_Data1.csv
