In [1]:
# =============================================================================
# Projet : Analyse des données criminelles avec PySpark
# But : Analyse complète des crimes de Chicago (~8 millions de lignes)
#       dans le cadre de mon autoformation Big Data et Spark
#       avec application du maximum de fonctions PySpark (niveau débutant → avancé)
# Auteur : Mr. Ayoub BAMOUH
# Fonction : Data Engineer & Biomedical Engineer
# =============================================================================

In [2]:
# =========================================================
# 1. Connexion Google Colab avec Google Drive
# =========================================================

In [3]:
from google.colab import drive         # Import du module Google Colab Drive pour connecter ton Drive
drive.mount('/content/drive')          # Monte ton Google Drive dans /content/drive pour accéder aux fichiers

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
# =========================================================
# 2. Installation & Configuration Spark
# =========================================================

In [5]:
!pip install pyspark -q                # Installe silencieusement PySpark (librairie principale Spark pour Python)
!pip install findspark -q              # Installe findspark pour faciliter l’intégration Spark ↔ Python

In [6]:
from pyspark.sql import SparkSession   # Import de SparkSession (point d’entrée de toute application Spark)
# Création de la session Spark avec un nom personnalisé pour ton projet
spark = SparkSession.builder.appName("Spark_Crime_Data_Insights_Project").getOrCreate()

In [7]:
# =========================================================
# 3. Import des librairies nécessaires
# =========================================================

In [8]:
from pyspark.sql.functions import *    # Import de toutes les fonctions SQL (ex: col, lit, to_date, to_timestamp…)
# Types de données explicites pour bien typer les colonnes (entiers, booléens, flottants, longs…)
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, LongType

In [9]:
# =========================================================
# 4. Ingestion des données
# =========================================================

In [10]:
# Téléchargement initial du dataset depuis le site officiel (à exécuter une seule fois)
# !wget -O /content/drive/MyDrive/PySpark-Projet-Pratique-2025/crimes_data_set.csv https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD

In [11]:
# Lecture du fichier CSV avec Spark (en précisant header=True pour garder les noms de colonnes)
df = spark.read.csv('/content/drive/MyDrive/PySpark-Projet-Pratique-2025/crimes_data_set.csv', header=True)

In [12]:
print("Nombre de lignes :", df.count())   # Affiche le nombre total de lignes (≈ 8 millions)
df.printSchema()                          # Affiche le schéma actuel (toutes colonnes = string au départ)
df.show(5)                                # Affiche les 5 premières lignes pour aperçu

Nombre de lignes : 8407333
root
 |-- ID: string (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: string (nullable = true)
 |-- Domestic: string (nullable = true)
 |-- Beat: string (nullable = true)
 |-- District: string (nullable = true)
 |-- Ward: string (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: string (nullable = true)
 |-- Y Coordinate: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Location: string (nullable = true)

+--------+-----------+--------------------+--------------------+----+----------

In [13]:
# =========================================================
# 5. Nettoyage & typage correct des colonnes
# =========================================================

In [14]:
# Conversion des colonnes mal typées : string → type correct
df = df.withColumn("ID", col("ID").cast(LongType())) \
       .withColumn("Date", to_timestamp(col("Date"), "MM/dd/yyyy hh:mm:ss a")) \
       .withColumn("Arrest", col("Arrest").cast(BooleanType())) \
       .withColumn("Domestic", col("Domestic").cast(BooleanType())) \
       .withColumn("Beat", col("Beat").cast(IntegerType())) \
       .withColumn("District", col("District").cast(IntegerType())) \
       .withColumn("Ward", col("Ward").cast(IntegerType())) \
       .withColumn("Community Area", col("Community Area").cast(IntegerType())) \
       .withColumn("Year", col("Year").cast(IntegerType())) \
       .withColumn("X Coordinate", col("X Coordinate").cast(DoubleType())) \
       .withColumn("Y Coordinate", col("Y Coordinate").cast(DoubleType())) \
       .withColumn("Latitude", col("Latitude").cast(DoubleType())) \
       .withColumn("Longitude", col("Longitude").cast(DoubleType()))

df.printSchema()   # Vérifie le schéma après typage correct

root
 |-- ID: long (nullable = true)
 |-- Case Number: string (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Block: string (nullable = true)
 |-- IUCR: string (nullable = true)
 |-- Primary Type: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Location Description: string (nullable = true)
 |-- Arrest: boolean (nullable = true)
 |-- Domestic: boolean (nullable = true)
 |-- Beat: integer (nullable = true)
 |-- District: integer (nullable = true)
 |-- Ward: integer (nullable = true)
 |-- Community Area: integer (nullable = true)
 |-- FBI Code: string (nullable = true)
 |-- X Coordinate: double (nullable = true)
 |-- Y Coordinate: double (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Updated On: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)



In [15]:
# =========================================================
# 6. Exploration des données
# =========================================================

In [16]:
print("Nombre total de crimes :", df.count())   # Vérifie à nouveau le nombre total de lignes
print("Colonnes :", df.columns)                 # Affiche toutes les colonnes disponibles
df.select("Date", "Primary Type", "Arrest", "Location").show(10)   # Exemple : aperçu de certaines colonnes

Nombre total de crimes : 8407333
Colonnes : ['ID', 'Case Number', 'Date', 'Block', 'IUCR', 'Primary Type', 'Description', 'Location Description', 'Arrest', 'Domestic', 'Beat', 'District', 'Ward', 'Community Area', 'FBI Code', 'X Coordinate', 'Y Coordinate', 'Year', 'Updated On', 'Latitude', 'Longitude', 'Location']
+-------------------+--------------------+------+--------------------+
|               Date|        Primary Type|Arrest|            Location|
+-------------------+--------------------+------+--------------------+
|2022-07-29 03:39:00|OFFENSE INVOLVING...|  true|                NULL|
|2023-01-03 16:44:00|           NARCOTICS|  true|                NULL|
|2020-08-10 09:45:00|             ROBBERY|  true|(41.908417822, -8...|
|2017-08-26 10:00:00| CRIM SEXUAL ASSAULT| false|                NULL|
|2023-09-06 17:00:00|     CRIMINAL DAMAGE| false|(41.886018055, -8...|
|2023-09-06 11:00:00|               THEFT| false|(41.871834768, -8...|
|2019-05-21 08:20:00|            BURGLARY| f

In [17]:
# Ajout d’une colonne constante "Dataset_Version"
df = df.withColumn("Dataset_Version", lit("v1"))
df.select("Dataset_Version").distinct().show()  # Vérifie que la colonne a bien été ajoutée

+---------------+
|Dataset_Version|
+---------------+
|             v1|
+---------------+



In [18]:
# Crimes sur une date précise (exemple Noël 2024)
one_day = df.filter(to_date(col("Date")) == lit("2024-12-25"))
print("Crimes du 25/12/2024 :", one_day.count())

Crimes du 25/12/2024 : 507


In [19]:
# Vérifie le nombre de types de crimes différents
print("Types de crimes :", df.select("Primary Type").distinct().count())

Types de crimes : 34


In [20]:
# =========================================================
# 7. Analyses simples
# =========================================================

In [21]:
# Nombre de crimes par type, trié par ordre décroissant
df.groupBy("Primary Type").count().orderBy("count", ascending=False).show(10)

+-------------------+-------+
|       Primary Type|  count|
+-------------------+-------+
|              THEFT|1784548|
|            BATTERY|1531956|
|    CRIMINAL DAMAGE| 956023|
|          NARCOTICS| 763256|
|            ASSAULT| 562825|
|      OTHER OFFENSE| 524112|
|           BURGLARY| 444616|
|MOTOR VEHICLE THEFT| 429512|
| DECEPTIVE PRACTICE| 386844|
|            ROBBERY| 314226|
+-------------------+-------+
only showing top 10 rows



In [22]:
# Nombre de crimes par type de lieu
df.groupBy("Location Description").count().orderBy("count", ascending=False).show(10)

+--------------------+-------+
|Location Description|  count|
+--------------------+-------+
|              STREET|2196511|
|           RESIDENCE|1381507|
|           APARTMENT| 997950|
|            SIDEWALK| 761589|
|               OTHER| 269939|
|PARKING LOT/GARAG...| 202930|
|               ALLEY| 187279|
|  SMALL RETAIL STORE| 168813|
|SCHOOL, PUBLIC, B...| 146368|
|          RESTAURANT| 141203|
+--------------------+-------+
only showing top 10 rows



In [23]:
# Pourcentage des crimes ayant mené à une arrestation
Total = df.filter(col("Arrest") == "true").count()
Arrests = df.select("Arrest").count()
Taux = (Total / Arrests)*100
print("Taux arrestation :", Taux, "%")

Taux arrestation : 25.247768822764606 %


In [24]:
# =========================================================
# 8. Analyses avancées
# =========================================================

In [25]:
# Pivot table : nombre de crimes par année et par statut d’arrestation (True/False)
df.groupBy("Year").pivot("Arrest").count().orderBy("Year").show()

+----+------+------+
|Year| false|  true|
+----+------+------+
|2001|344008|141944|
|2002|345249|141581|
|2003|334391|141604|
|2004|324734|144705|
|2005|312857|140928|
|2006|312769|135429|
|2007|305220|131885|
|2008|317183|110028|
|2009|282011|110848|
|2010|269993|100562|
|2011|255733| 96305|
|2012|245683| 90684|
|2013|221025| 86578|
|2014|196203| 79663|
|2015|194795| 70071|
|2016|216864| 53065|
|2017|216563| 52693|
|2018|215193| 53911|
|2019|205356| 56292|
|2020|178443| 34195|
+----+------+------+
only showing top 20 rows



In [26]:
# Window function : obtenir le dernier crime par district (ROW_NUMBER)
from pyspark.sql.window import Window
w = Window.partitionBy("District").orderBy(col("Date").desc())
df.withColumn("rn", row_number().over(w)) \
  .filter("rn = 1") \
  .select("District","Date","Primary Type").show(10)

+--------+-------------------+------------------+
|District|               Date|      Primary Type|
+--------+-------------------+------------------+
|    NULL|2017-12-14 20:10:00|           BATTERY|
|       1|2025-09-17 00:00:00|   CRIMINAL DAMAGE|
|       2|2025-09-17 00:00:00|     OTHER OFFENSE|
|       3|2025-09-17 00:00:00|     OTHER OFFENSE|
|       4|2025-09-16 22:10:00|           BATTERY|
|       5|2025-09-16 23:54:00|           BATTERY|
|       6|2025-09-17 00:00:00|   CRIMINAL DAMAGE|
|       7|2025-09-16 23:30:00|             THEFT|
|       8|2025-09-17 00:00:00|DECEPTIVE PRACTICE|
|       9|2025-09-16 23:36:00| WEAPONS VIOLATION|
+--------+-------------------+------------------+
only showing top 10 rows



In [27]:
# Vérifier les valeurs nulles par colonne
print("Valeurs nulles par colonne :")
for c in df.columns:
    print(c, df.filter(col(c).isNull()).count())

Valeurs nulles par colonne :
ID 0
Case Number 0
Date 0
Block 0
IUCR 0
Primary Type 0
Description 0
Location Description 14872
Arrest 0
Domestic 0
Beat 0
District 47
Ward 614822
Community Area 613687
FBI Code 0
X Coordinate 93694
Y Coordinate 93694
Year 0
Updated On 0
Latitude 93694
Longitude 93694
Location 93694
Dataset_Version 0


In [28]:
# =========================================================
# 9. SQL avec Spark
# =========================================================

In [29]:
# Création d’une vue SQL temporaire pour interroger le DataFrame en SQL
df.createOrReplaceTempView("crimes")

In [30]:
# Exemple : crimes menant à une arrestation par année et type
spark.sql("""
SELECT Year, `Primary Type`, COUNT(*) as total
FROM crimes
WHERE Arrest = true
GROUP BY Year, `Primary Type`
ORDER BY total DESC
""").show(10)

+----+------------+-----+
|Year|Primary Type|total|
+----+------------+-----+
|2004|   NARCOTICS|57034|
|2005|   NARCOTICS|56121|
|2006|   NARCOTICS|55236|
|2003|   NARCOTICS|54283|
|2007|   NARCOTICS|53251|
|2002|   NARCOTICS|51781|
|2001|   NARCOTICS|50560|
|2008|   NARCOTICS|45482|
|2010|   NARCOTICS|43299|
|2009|   NARCOTICS|43215|
+----+------------+-----+
only showing top 10 rows



In [31]:
# =========================================================
# 10. UDF : fonction personnalisée
# =========================================================

In [32]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Fonction Python pour classifier les crimes en catégories simples
def classify_crime(desc):
    if "THEFT" in str(desc).upper():
        return "Vol"
    elif "BATTERY" in str(desc).upper():
        return "Agression"
    return "Autre"

# Transformation de la fonction Python → UDF utilisable par Spark
udf_classify = udf(classify_crime, StringType())

# Application : création d’une nouvelle colonne "Crime_Category"
df = df.withColumn("Crime_Category", udf_classify(col("Primary Type")))
df.select("Primary Type", "Crime_Category").distinct().show(10)

+--------------------+--------------+
|        Primary Type|Crime_Category|
+--------------------+--------------+
|CRIMINAL SEXUAL A...|         Autre|
|PUBLIC PEACE VIOL...|         Autre|
|     CRIMINAL DAMAGE|         Autre|
|           RITUALISM|         Autre|
|        NON-CRIMINAL|         Autre|
|             ROBBERY|         Autre|
| CRIM SEXUAL ASSAULT|         Autre|
|        PROSTITUTION|         Autre|
|CONCEALED CARRY L...|         Autre|
|LIQUOR LAW VIOLATION|         Autre|
+--------------------+--------------+
only showing top 10 rows



In [33]:
# =========================================================
# 11. Sauvegarde des résultats
# =========================================================

In [34]:
# Sauvegarde du DataFrame nettoyé en format Parquet (format optimisé pour Big Data)
#df.write.mode("overwrite").parquet("/content/drive/MyDrive/crimes_cleaned.parquet")