In [2]:
pip install --upgrade apache-sedona

Note: you may need to restart the kernel to use updated packages.


In [2]:
from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.utils import KryoSerializer, SedonaKryoRegistrator

# Creation de la session Spark avec Sedona
spark = SparkSession.builder \
    .appName("SedonaApp") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryo.registrator", "org.apache.sedona.core.serde.SedonaKryoRegistrator") \
    .config("spark.jars.packages", "org.apache.sedona:sedona-spark-shaded-3.5_2.12:1.7.2,org.datasyslab:geotools-wrapper:1.4.0-28.2") \
    .getOrCreate()


SedonaRegistrator.registerAll(spark)

# Affichage des versions
print(f"Spark version: {spark.version}")
print("Scala version JVM:", spark.sparkContext._jvm.scala.util.Properties.versionString())


  SedonaRegistrator.registerAll(spark)


Spark version: 3.5.4
Scala version JVM: version 2.12.18


In [4]:
df = spark.read.csv("Accident.csv", header=True, inferSchema=True)

In [6]:
df.printSchema()

root
 |-- Quarter: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Total_Crashes: integer (nullable = true)
 |-- Num_Injured: integer (nullable = true)
 |-- Num_Killed: integer (nullable = true)
 |-- Total_Vehicles_Involved: integer (nullable = true)
 |-- SPV: integer (nullable = true)
 |-- DAD: integer (nullable = true)
 |-- PWR: integer (nullable = true)
 |-- FTQ: integer (nullable = true)
 |-- Other_Factors: integer (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)



In [8]:
from pyspark.sql.functions import expr

df_geo = df.withColumn(
    "geom",
    expr("ST_Point(cast(Longitude as Decimal(24,20)), cast(Latitude as Decimal(24,20)))")
)

In [10]:
from pyspark.sql.functions import monotonically_increasing_id

df_with_id = df_geo.withColumn("id", monotonically_increasing_id())
df_with_id.createOrReplaceTempView("accidents_geom")


In [12]:
#### Crrer une vue
df_geo.createOrReplaceTempView("accidents")

In [41]:
from pyspark.sql.functions import monotonically_increasing_id

df_geo = df_geo.withColumn("id", monotonically_increasing_id())
df_geo.createOrReplaceTempView("accidents_geom")


### Les lieux où plusieurs accidents sont géographiquement très proches les uns des autres (Analyse de proximité / regroupement spatial)

In [44]:
query_clusters = """
SELECT DISTINCT
    a1.State AS State,
    a1.Quarter AS Quarter,
    a1.Latitude AS Latitude,
    a1.Longitude AS Longitude,
    a1.Num_Killed AS Deaths,
    a1.Num_Injured AS Injured
FROM accidents_geom a1
JOIN accidents_geom a2
  ON a1.id != a2.id
WHERE ST_Intersects(
    ST_Buffer(a1.geom, 0.05),
    a2.geom
)
"""

df_clusters = spark.sql(query_clusters)
df_clusters.show(truncate=False)


+-----------+-------+----------+----------+------+-------+
|State      |Quarter|Latitude  |Longitude |Deaths|Injured|
+-----------+-------+----------+----------+------+-------+
|Cross River|Q2 2021|5.8671966 |8.5204774 |5     |74     |
|Gombe      |Q2 2022|10.4304018|11.2065408|35    |199    |
|Benue      |Q4 2022|7.3505747 |8.7772877 |24    |225    |
|Delta      |Q3 2021|5.5273061 |6.1784167 |23    |122    |
|Benue      |Q2 2022|7.3505747 |8.7772877 |51    |273    |
|Oyo        |Q1 2024|8.2151249 |3.5642897 |68    |298    |
|Plateau    |Q2 2021|9.0583446 |9.6826289 |23    |283    |
|Oyo        |Q1 2022|8.2151249 |3.5642897 |73    |540    |
|Jigawa     |Q2 2022|12.3252362|9.5103296 |46    |333    |
|Kwara      |Q2 2022|8.8367891 |4.6688487 |67    |342    |
|Akwa Ibom  |Q1 2024|4.9906379 |7.7966205 |5     |23     |
|Plateau    |Q1 2024|9.0583446 |9.6826289 |15    |185    |
|Delta      |Q1 2021|5.5273061 |6.1784167 |51    |196    |
|Kaduna     |Q4 2023|10.5182899|7.4359863 |150   |889   

La requête identifie les lieux où plusieurs accidents sont géographiquement très proches les uns des autres (dans un rayon d’environ 5 km). Cela peut signaler :

Un point noir routier (zone à haut risque).

Une mauvaise infrastructure (routes étroites, feux absents, etc.).

Un problème de signalisation ou d’éclairage.

Une concentration de population et de trafic (zones urbaines).

### Identifier les zones avec un taux de mortalite elever pour orienter les autorites (Agrégation spatiale)

In [75]:
query_mortalite_spatiale = """
SELECT
  FLOOR(Latitude * 2) / 2 AS lat_bin,
  FLOOR(Longitude * 2) / 2 AS lon_bin,
  COUNT(*) AS nb_accidents,
  SUM(Num_Killed) AS total_morts,
  ROUND(SUM(Num_Killed) / COUNT(*), 2) AS taux_mortalite
FROM accidents
WHERE Num_Killed IS NOT NULL AND Num_Killed < 50
GROUP BY lat_bin, lon_bin
ORDER BY taux_mortalite DESC
"""
df_mortalite_spatiale = spark.sql(query_mortalite_spatiale)
df_mortalite_spatiale.show(truncate=False)


+-------+-------+------------+-----------+--------------+
|lat_bin|lon_bin|nb_accidents|total_morts|taux_mortalite|
+-------+-------+------------+-----------+--------------+
|8.0    |3.5    |1           |45         |45.0          |
|9.5    |5.5    |1           |41         |41.0          |
|7.0    |5.0    |8           |319        |39.88         |
|10.5   |10.0   |4           |153        |38.25         |
|12.0   |9.5    |10          |378        |37.8          |
|9.0    |7.0    |1           |37         |37.0          |
|11.5   |8.5    |4           |140        |35.0          |
|7.5    |6.5    |3           |103        |34.33         |
|6.0    |3.0    |12          |409        |34.08         |
|8.0    |8.0    |5           |166        |33.2          |
|7.5    |4.0    |8           |262        |32.75         |
|12.5   |7.5    |13          |404        |31.08         |
|6.5    |5.5    |11          |338        |30.73         |
|8.5    |4.5    |3           |88         |29.33         |
|12.0   |11.5 

L’analyse spatiale du taux de mortalité routière par cellule géographique (définie par des tranches de 0.5° de latitude et de longitude) permet de localiser les zones les plus critiques. Par exemple, la cellule (8.0, 3.5) présente un taux de mortalité de 45 décès pour un seul accident, ce qui indique une extrême gravité. D'autres zones comme (7.0, 5.0) ou (12.0, 9.5) enregistrent également des taux élevés, révélant des points noirs potentiels en matière de sécurité routière. Ce type d’analyse aide à orienter les autorités vers des zones prioritaires d’intervention, afin de renforcer la prévention et les infrastructures routières.

### Comparaison Urbain (Lagos) vs Rurale

In [84]:
query6 = """
SELECT
    CASE
        WHEN ST_Within(geom, ST_GeomFromText('POLYGON((3.2 6.3, 3.2 6.7, 3.6 6.7, 3.6 6.3, 3.2 6.3))'))
        THEN 'Zone urbaine (Lagos)'
        ELSE 'Zone rurale'
    END AS Zone,
    COUNT(*) AS Nombre_Accidents,
    SUM(Num_Killed) AS Morts,
    SUM(Num_Injured) AS Blesses
FROM accidents_geom
GROUP BY Zone
"""
df_zones = spark.sql(query6)
df_zones.show()


+--------------------+----------------+-----+-------+
|                Zone|Nombre_Accidents|Morts|Blesses|
+--------------------+----------------+-----+-------+
|         Zone rurale|             504|20513| 124203|
|Zone urbaine (Lagos)|              14|  518|   3034|
+--------------------+----------------+-----+-------+



### Gravité moyenne par État (morts/accident)

In [55]:
query10 = """
SELECT
    State,
    ROUND(SUM(Num_Killed) / COUNT(*), 2) AS Morts_par_Accident,
    ROUND(SUM(Num_Injured) / COUNT(*), 2) AS Blesses_par_Accident
FROM accidents_geom
GROUP BY State
ORDER BY Morts_par_Accident DESC
"""
df_gravite = spark.sql(query10)
df_gravite.show()


+--------+------------------+--------------------+
|   State|Morts_par_Accident|Blesses_par_Accident|
+--------+------------------+--------------------+
|  Kaduna|            160.14|              804.57|
|    Ogun|             100.0|              622.36|
|   Niger|             96.86|              509.43|
|  Bauchi|              82.5|              492.07|
|     FCT|             77.64|              628.93|
|     Oyo|              75.0|              392.57|
|    Kano|             74.14|              320.57|
|    Kogi|             64.14|              370.29|
|   Kwara|              58.0|              307.14|
|Nasarawa|             53.21|              528.64|
|    Ondo|             48.79|              271.14|
|    Osun|             47.21|              295.64|
|  Jigawa|             44.43|               380.5|
|   Lagos|              37.0|              216.71|
|     Edo|             35.21|              149.86|
|    Yobe|             34.64|               229.0|
| Katsina|             34.36|  

Cette analyse met en évidence la gravité moyenne des accidents dans chaque État nigérian, en calculant le nombre moyen de morts et de blessés par accident. Des États comme Kaduna, Ogun et Niger présentent des taux exceptionnellement élevés, suggérant des accidents très graves . Lagos, bien que très accidentogène, affiche une gravité moyenne plus faible, ce qui pourrait refléter une meilleure prise en charge. 

### Regroupement spatial (Clustering)

In [69]:
query = """
SELECT grid_cell, COUNT(*) AS nb_accidents
FROM (
  SELECT ST_Pixelize(geom, 0.1) AS grid_cell
  FROM accidents
)
GROUP BY grid_cell
ORDER BY nb_accidents DESC
"""

df_pix = spark.sql(query)
df_pix.show(truncate=False)


AnalysisException: [UNRESOLVED_ROUTINE] Cannot resolve function `ST_Pixelize` on search path [`system`.`builtin`, `system`.`session`, `spark_catalog`.`default`].; line 4 pos 9

#### les zones les plus accidentogenes

In [67]:
query = """
SELECT 
    FLOOR(Latitude * 100) / 100 AS lat_bin,
    FLOOR(Longitude * 100) / 100 AS lon_bin,
    State,
    COUNT(*) AS nombre_accidents,
    SUM(Num_Killed) AS morts,
    SUM(Num_Injured) AS blesses
FROM accidents
GROUP BY lat_bin, lon_bin, state
ORDER BY nombre_accidents DESC
LIMIT 10
"""

result = spark.sql(query)
result.show()


+-------+-------+-----------+----------------+-----+-------+
|lat_bin|lon_bin|      State|nombre_accidents|morts|blesses|
+-------+-------+-----------+----------------+-----+-------+
|    6.6|   5.97|        Edo|              14|  493|   2098|
|  12.56|   7.62|    Katsina|              14|  481|   2137|
|   4.76|   6.02|    Bayelsa|              14|   57|    299|
|   6.19|   8.03|     Ebonyi|              14|  265|   1237|
|  10.43|   11.2|      Gombe|              14|  375|   3590|
|   7.02|   5.05|       Ondo|              14|  683|   3796|
|  10.51|   7.43|     Kaduna|              14| 2242|  11264|
|   5.86|   8.52|Cross River|              14|  217|   1025|
|   6.45|   3.39|      Lagos|              14|  518|   3034|
|  12.18|   13.3|      Borno|              14|  219|   1638|
+-------+-------+-----------+----------------+-----+-------+



In [108]:
query3 = """
SELECT
    FLOOR(a.Latitude * 100) / 100 AS lat_bin,
    FLOOR(a.Longitude * 100) / 100 AS lon_bin,
    COUNT(*) AS nb_accidents_dans_buffer
FROM accidents a
GROUP BY lat_bin, lon_bin
ORDER BY nb_accidents_dans_buffer DESC
LIMIT 10
"""

result3 = spark.sql(query3)
result3.show()


+-------+-------+------------------------+
|lat_bin|lon_bin|nb_accidents_dans_buffer|
+-------+-------+------------------------+
|   5.45|   7.51|                      14|
|    6.6|   5.97|                      14|
|   8.21|   3.56|                      14|
|   8.01|  10.73|                      14|
|   4.76|   6.02|                      14|
|   9.05|   9.68|                      14|
|   5.52|   6.17|                      14|
|  13.06|   5.31|                      14|
|   6.19|   8.03|                      14|
|   7.54|   4.49|                      14|
+-------+-------+------------------------+

