In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("TrafficWeatherAccidents") \
    .getOrCreate()

spark

In [2]:
import pandas as pd
from pathlib import Path

data_dir = Path("..") / "data"

acc_excel_path = data_dir / "OPENDATA_MAP_2017-2024.xlsx"
weather_excel_path = data_dir / "synop_data.xls"

acc_csv_path = data_dir / "accidents.csv"
weather_csv_path = data_dir / "weather.csv"

df_acc = pd.read_excel(acc_excel_path)
df_acc.to_csv(acc_csv_path, index=False)

df_weather = pd.read_excel(weather_excel_path)
df_weather.to_csv(weather_csv_path, index=False)


In [5]:
accidents_path = "/home/jovyan/work/data/accidents.csv"
traffic_path = "/home/jovyan/work/data/traffic_live_geom.csv"
weather_path = "/home/jovyan/work/data/weather.csv"

accidents_df = spark.read.option("header", "true").option("inferSchema", "true").csv(accidents_path)
traffic_df   = spark.read.option("header", "true").option("inferSchema", "true").csv(traffic_path)
weather_df   = spark.read.option("header", "true").option("inferSchema", "true").csv(weather_path)

accidents_df.printSchema()
traffic_df.printSchema()
weather_df.printSchema()

root
 |-- DT_YEAR_COLLISION: integer (nullable = true)
 |-- DT_MONTH_COLLISION: integer (nullable = true)
 |-- DT_TIME: integer (nullable = true)
 |-- CD_NIS: integer (nullable = true)
 |-- TX_RGN_COLLISION_FR: string (nullable = true)
 |-- TX_RGN_COLLISION_NL: string (nullable = true)
 |-- TX_PROV_COLLISION_FR: string (nullable = true)
 |-- TX_PROV_COLLISION_NL: string (nullable = true)
 |-- TX_MUNTY_COLLISION_FR: string (nullable = true)
 |-- TX_MUNTY_COLLISION_NL: string (nullable = true)
 |-- MS_X_COORD: double (nullable = true)
 |-- MS_Y_COORD: double (nullable = true)
 |-- CD_CROSSWAY: integer (nullable = true)
 |-- TX_CROSSWAY_FR: string (nullable = true)
 |-- TX_CROSSWAY_NL: string (nullable = true)
 |-- CD_WEATHER: string (nullable = true)
 |-- TX_WEATHER_FR: string (nullable = true)
 |-- TX_WEATHER_NL: string (nullable = true)
 |-- CD_ROAD_CONDITION: string (nullable = true)
 |-- TX_ROAD_CONDITION_FR: string (nullable = true)
 |-- TX_ROAD_CONDITION_NL: string (nullable = true

In [7]:
print("Accidents rows:", accidents_df.count())
print("Traffic rows:", traffic_df.count())
print("Weather rows:", weather_df.count())

Accidents rows: 289532
Traffic rows: 89
Weather rows: 27


## 1. Verkeersongevallen

In [11]:
accidents_df.select(
    "DT_YEAR_COLLISION",
    "DT_MONTH_COLLISION",
    "DT_TIME",
    "TX_PROV_COLLISION_NL",
    "TX_WEATHER_NL",
    "TX_CLASS_ACCIDENTS_NL"
).show(20, truncate=False)

+-----------------+------------------+-------+--------------------+-------------+---------------------+
|DT_YEAR_COLLISION|DT_MONTH_COLLISION|DT_TIME|TX_PROV_COLLISION_NL|TX_WEATHER_NL|TX_CLASS_ACCIDENTS_NL|
+-----------------+------------------+-------+--------------------+-------------+---------------------+
|2017             |1                 |1      |NULL                |Normaal      |met lichtgewonden    |
|2017             |1                 |0      |Provincie Antwerpen |Normaal      |met lichtgewonden    |
|2017             |1                 |1      |Provincie Limburg   |Onbekend     |met lichtgewonden    |
|2017             |1                 |10     |NULL                |Normaal      |met lichtgewonden    |
|2017             |1                 |7      |Provincie Antwerpen |Normaal      |met lichtgewonden    |
|2017             |1                 |13     |Provincie Antwerpen |Normaal      |met lichtgewonden    |
|2017             |1                 |13     |Provincie Antwerpe

In [12]:
accidents_df.groupBy("DT_YEAR_COLLISION").count().orderBy("DT_YEAR_COLLISION").show()

+-----------------+-----+
|DT_YEAR_COLLISION|count|
+-----------------+-----+
|             2017|38025|
|             2018|38453|
|             2019|37719|
|             2020|30251|
|             2021|34660|
|             2022|37650|
|             2023|36858|
|             2024|35916|
+-----------------+-----+



In [13]:
accidents_df.groupBy("DT_MONTH_COLLISION").count().orderBy("DT_MONTH_COLLISION").show()

+------------------+-----+
|DT_MONTH_COLLISION|count|
+------------------+-----+
|                 1|21021|
|                 2|20617|
|                 3|22235|
|                 4|21137|
|                 5|26315|
|                 6|28471|
|                 7|23502|
|                 8|23708|
|                 9|27926|
|                10|27882|
|                11|24430|
|                12|22288|
+------------------+-----+



In [14]:
accidents_df.select("DT_TIME").distinct().orderBy("DT_TIME").show(50)

+-------+
|DT_TIME|
+-------+
|      0|
|      1|
|      2|
|      3|
|      4|
|      5|
|      6|
|      7|
|      8|
|      9|
|     10|
|     11|
|     12|
|     13|
|     14|
|     15|
|     16|
|     17|
|     18|
|     19|
|     20|
|     21|
|     22|
|     23|
|     99|
+-------+



In [15]:
from pyspark.sql.functions import col, when

accidents_df = accidents_df \
    .withColumn("year", col("DT_YEAR_COLLISION").cast("int")) \
    .withColumn("month", col("DT_MONTH_COLLISION").cast("int")) \
    .withColumn("hour", when(col("DT_TIME") == 99, None).otherwise(col("DT_TIME").cast("int")))

In [19]:
accidents_df.groupBy("year").count().orderBy("year").show()
accidents_df.groupBy("month").count().orderBy("month").show()
accidents_df.groupBy("hour").count().orderBy("hour").show(25)

+----+-----+
|year|count|
+----+-----+
|2017|38025|
|2018|38453|
|2019|37719|
|2020|30251|
|2021|34660|
|2022|37650|
|2023|36858|
|2024|35916|
+----+-----+

+-----+-----+
|month|count|
+-----+-----+
|    1|21021|
|    2|20617|
|    3|22235|
|    4|21137|
|    5|26315|
|    6|28471|
|    7|23502|
|    8|23708|
|    9|27926|
|   10|27882|
|   11|24430|
|   12|22288|
+-----+-----+

+----+-----+
|hour|count|
+----+-----+
|NULL|    3|
|   0| 4517|
|   1| 3891|
|   2| 3123|
|   3| 2962|
|   4| 2972|
|   5| 4374|
|   6| 7057|
|   7|14588|
|   8|19120|
|   9|11931|
|  10|12950|
|  11|14616|
|  12|16952|
|  13|16307|
|  14|17528|
|  15|21189|
|  16|25290|
|  17|25298|
|  18|19478|
|  19|14034|
|  20|10282|
|  21| 8262|
|  22| 7074|
|  23| 5734|
+----+-----+



In [20]:
accidents_df.groupBy("TX_PROV_COLLISION_NL").count().orderBy("count", ascending=False).show(truncate=False)

+-------------------------+-----+
|TX_PROV_COLLISION_NL     |count|
+-------------------------+-----+
|Provincie Antwerpen      |53106|
|Provincie Oost-Vlaanderen|46193|
|Provincie West-Vlaanderen|33293|
|NULL                     |30022|
|Provincie Henegouwen     |28397|
|Provincie Luik           |25907|
|Provincie Vlaams-Brabant |23990|
|Provincie Limburg        |21766|
|Provincie Namen          |11642|
|Provincie Waals-Brabant  |8189 |
|Provincie Luxemburg      |7027 |
+-------------------------+-----+



In [21]:
accidents_df.groupBy("TX_WEATHER_NL").count().orderBy("count", ascending=False).show(truncate=False)

+---------------------------------------------+------+
|TX_WEATHER_NL                                |count |
+---------------------------------------------+------+
|Normaal                                      |226636|
|Regenval                                     |29706 |
|Onbekend                                     |27763 |
|Sneeuwval                                    |1466  |
|Andere (dikke rook,...)                      |1238  |
|Mist (zichtbaarheid minder dan 100m)         |1193  |
|Sterke wind, rukwind                         |814   |
|Regenval+Sterke wind, rukwind                |416   |
|Hagelbui                                     |206   |
|Regenval+Sneeuwval                           |38    |
|Regenval+Mist (zichtbaarheid minder dan 100m)|15    |
|Regenval+Hagelbui                            |15    |
|Sterke wind, rukwind+Sneeuwval               |11    |
|Sneeuwval+Hagelbui                           |11    |
|Regenval+Andere (dikke rook,...)             |3     |
|Sterke wi