# **Projet Spark**
## Analyse du trafic des taxis à New York City
### Mastering Big Data avec Apache Spark

**Introduction**

Ce projet est une occasion pour vous familiariser avec la conception et la mise en œuvre de pipelines de traitement et d’analyse de données à grande échelle à l’aide d’Apache Spark.

**Objectif du projet**

L’objectif de ce projet est de concevoir un pipeline de traitement et d’analyse de données à l’aide d’Apache Spark afin d’explorer le trafic des taxis dans la ville de New York.

Vous devez effectuer les étapes suivantes : ingestion, nettoyage, transformation et analyse exploratoire des données afin d’identifier des tendances spatio-temporelles, des comportements de paiement et des opportunités d’optimisation (ex. covoiturage urbain).

**Jeu de données** : NYC Taxi Trip Records
Les données proviennent du site officiel de la New York City Taxi and Limousine

Commission (TLC) :
https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page

Chaque enregistrement correspond à un trajet individuel effectué par un taxi.
Consultez le fichier annexe DataDictionary–YellowTaxiTripRecords.pdf contenant la description des données.

| Nom de la colonne                  | Description                                                                 |
|----------------------------------|----------------------------------------------------------------------------|
| tpep_pickup_datetime              | Date et heure de prise en charge du passager                                |
| tpep_dropoff_datetime             | Date et heure de dépôt du passager                                         |
| passenger_count                   | Nombre de passagers transportés                                            |
| trip_distance                     | Distance totale du trajet (en miles)                                        |
| pickup_longitude / latitude       | Coordonnées géographiques du point de départ                                |
| dropoff_longitude / latitude      | Coordonnées géographiques du point d’arrivée                                |
| fare_amount                       | Montant de la course (hors taxes et suppléments)                             |
| extra, mta_tax, tip_amount, tolls_amount, total_amount | Différentes composantes du tarif total payé                       |
| payment_type                      | Type de paiement (1 = carte, 2 = espèces, etc.)                             |
| RatecodeID                         | Code tarifaire (tarif standard, aéroport, etc.)                             |
| VendorID                           | Identifiant du fournisseur de données (taxi jaune, vert, etc.)             |


**Fichiers de zones TLC**

Afin de relier les identifiants PULocationID et DOLocationID à leurs noms réels de zones géographiques, il est nécessaire d’utiliser les fichiers CSV complémentaires fournis par la TLC :

— taxi_zone_lookup.csv : fichier contenant la correspondance entre les identifiantsde zones et leur description.

Contenu du fichier taxi_zone_lookup.csv :

| Nom de la colonne | Description                                                                 |
|-----------------|----------------------------------------------------------------------------|
| LocationID       | Identifiant unique de la zone (correspond à PULocationID / DOLocationID)  |
| Borough          | Nom du borough (district administratif, ex. Manhattan, Brooklyn, Queens...) |
| Zone             | Nom spécifique de la zone ou du quartier                                    |
| service_zone     | Type de service (Yellow, Green, etc.)                                       |


**Lien officiel de téléchargement :**

https://www1.nyc.gov/assets/tlc/downloads/pdf/taxi_zone_lookup.csv

Utilisation attendue :

— Charger ce fichier dans Spark.

— Joindre les zones via :

— PULocationID = LocationID pour obtenir la zone de départ.

— DOLocationID = LocationID pour obtenir la zone d’arrivée.

— Remplacer les identifiants numériques par les noms de zones dans les analyses et visualisations

### Phase 1 : Ingestion et exploration initiale

**Objectifs :**

- Charger le jeu de données dans un DataFrame Spark

- Examiner le schéma et les types de colonnes

- Identifier les valeurs manquantes ou aberrantes

**Questions à traiter :**

- Combien de trajets sont disponibles dans l’échantillon?

- Quelle est la période couverte?

- Ya-t-il des valeurs nulles ou incohérentes (distance = 0, tarif négatif, etc.)?

In [1]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val spark = SparkSession.builder()
  .appName("NYC Taxi Analysis")
  .master("local[*]") 
  .getOrCreate()

spark.sparkContext.setLogLevel("ERROR") 


spark = org.apache.spark.sql.SparkSession@3319a314


org.apache.spark.sql.SparkSession@3319a314

In [19]:
val taxiDF = spark.read
  .option("header", "true")       
  .option("inferSchema", "true")  
  .parquet("yellow_tripdata_2025-11.parquet")

taxiDF.printSchema()
taxiDF.show(5)


root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)

+--------+--------------------+---------------------+---------------+------

taxiDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 18 more fields]


[VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 18 more fields]

**Interpretation :**
Le schéma du DataFrame montre un jeu de données riche et détaillé, composé de :

- Variables temporelles :

    - tpep_pickup_datetime, tpep_dropoff_datetime (type timestamp)

    → adaptées à l’analyse spatio-temporelle et au calcul de durées.

- Variables numériques continues :

    -trip_distance, fare_amount, tip_amount, total_amount, etc.

    → essentielles pour l’analyse des coûts, distances et vitesses.

- Variables catégorielles / identifiants :

    - VendorID, RatecodeID, payment_type, PULocationID, DOLocationID.

- Suppléments tarifaires récents :

    - congestion_surcharge, Airport_fee, cbd_congestion_fee

    → indiquent une évolution des politiques de tarification liées au trafic urbain.


→ La structure des données est cohérente avec les objectifs du projet. Elle permet :

- une analyse temporelle fine,

- une étude des comportements de paiement,

- et une analyse spatiale via les zones TLC.

In [3]:
// Nombre de trajets
val totalTrips = taxiDF.count()
println(s"Nombre total de trajets : $totalTrips")


totalTrips = 4181444


Nombre total de trajets : 4181444


4181444

**Interpretations:**

Ce volume important confirme qu’il s’agit bien d’un problème Big Data, nécessitant un moteur distribué comme Apache Spark.
Il permet d’obtenir des résultats statistiques robustes, mais implique également la présence potentielle d’erreurs et d’anomalies à grande échelle.

In [4]:
// Période couverte (date min et max)
val period = taxiDF.select(
  min("tpep_pickup_datetime").alias("min_pickup"),
  max("tpep_dropoff_datetime").alias("max_dropoff")
).show()


+-------------------+-------------------+
|         min_pickup|        max_dropoff|
+-------------------+-------------------+
|2008-12-31 23:04:21|2025-12-01 21:41:00|
+-------------------+-------------------+



period = ()


()

**Interpretations:**

Date minimale de prise en charge :
2008-12-31 23:04:21

Date maximale de dépose :
2025-12-01 21:41:00

→ La période couverte est anormalement large (plus de 15 ans), ce qui est incohérent avec un fichier mensuel ou annuel de la TLC.

In [5]:
// Vérifier les valeurs nulles
taxiDF.select(
  taxiDF.columns.map(c => sum(col(c).isNull.cast("int")).alias(c)): _*
).show()


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       0|                   0|                    0|        1014740|            0|   1014740|           1014740|           0|    

**Interprétation :**

- Les résultats montrent environ 1 014 740 valeurs nulles dans plusieurs colonnes clés, notamment :
    
    - passenger_count
    
    - RatecodeID
    
    - store_and_fwd_flag
    
    - congestion_surcharge
    
    - Airport_fee

- Certaines colonnes ne sont pas systématiquement renseignées, en particulier les surtaxes spécifiques (aéroport, congestion).

- Les valeurs manquantes peuvent dépendre :

    - du type de trajet,
    
    - de la zone,
    
    - ou de l’année de collecte.

In [6]:
// Vérifier les valeurs aberrantes
// Distances ou tarifs négatifs
taxiDF.filter(col("trip_distance") <= 0 || col("fare_amount") < 0).show(10)


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|cbd_congestion_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+
|       2| 2025-11-01 00:41:35|  2025-11-01 00:45:16|              1|         0.01|         1|                 N|         186|    

In [7]:
// Trajets avec distance nulle ou négative
val invalidDistance = taxiDF.filter(col("trip_distance") <= 0).count()

// Tarifs négatifs
val invalidFare = taxiDF.filter(col("fare_amount") < 0).count()

println(s"Trajets avec distance invalide : $invalidDistance")
println(s"Trajets avec tarif négatif : $invalidFare")


Trajets avec distance invalide : 109420
Trajets avec tarif négatif : 395093


invalidDistance = 109420
invalidFare = 395093


395093

**Interprétations :**

**Distances invalides**

Trajets avec distance ≤ 0 :
109 420 trajets

- Une distance nulle ou négative est physiquement impossible pour un trajet réel.

- Ces cas peuvent correspondre à :

    - des trajets annulés,
    
    - des erreurs de capteur,
    
    - ou des erreurs de saisie.
 
**Tarifs négatifs**

Trajets avec tarif négatif :
395 093 trajets

- Les montants négatifs (course, taxes, surcharge) sont économiquement incohérents.

- Ils peuvent indiquer :

    - des remboursements,
    
    - des corrections comptables,
    
    - ou des erreurs de données.

**Conclusion – Analyse des Phases 1**

L’exploration initiale a mis en évidence :

- ✔ un volume massif de données adapté à Spark

- ✔ une structure riche permettant des analyses avancées

- ⚠ une forte présence de valeurs manquantes

- ⚠ des anomalies significatives (distances nulles, tarifs négatifs, dates incohérentes)

### Phase 2 : Nettoyage et transformation des données

**Tâches :**

- Supprimer ou corriger les lignes contenant des erreurs.

- Convertir les colonnes de dates en format timestamp.

- Créer des colonnes dérivées :

- trip_duration : durée du trajet (en minutes)

- average_speed : vitesse moyenne (km/h)

- hour : heure de la journée

- day_of_week : jour de la semaine

- Catégoriser les trajets :

- short_trip : trajets de moins de 10 km

- long_trip : trajets de 10 km ou plus

**Questions à traiter :**

- Quelle est la distribution des distances et durées?

- Quelle proportion de trajets courts vs longs observe-t-on?

- Quelles sont les vitesses moyennes selon les heures ou les jours?

In [8]:
// Suppression des lignes contenant des erreurs
val cleanedDF = taxiDF
  .filter(col("trip_distance") > 0)  //distance ≤ 0
  .filter(col("fare_amount") >= 0) //tarif négatif ou nul
    //dates manquantes ou invalides
  .filter(col("tpep_pickup_datetime").isNotNull)
  .filter(col("tpep_dropoff_datetime").isNotNull)


cleanedDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 18 more fields]


[VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 18 more fields]

In [9]:
// Conversion des colonnes de dates en timestamp
val taxiTimeDF = cleanedDF
  .withColumn("pickup_ts", to_timestamp(col("tpep_pickup_datetime")))
  .withColumn("dropoff_ts", to_timestamp(col("tpep_dropoff_datetime")))



taxiTimeDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 20 more fields]


[VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 20 more fields]

In [10]:
// Création des colonnes dérivées:

// Durée du trajet (en minutes):
val taxiDurationDF = taxiTimeDF.withColumn(
  "trip_duration",
  (unix_timestamp(col("dropoff_ts")) - unix_timestamp(col("pickup_ts"))) / 60
)
// Eliminer les durées négatives ou nulles :
val taxiValidDF = taxiDurationDF.filter(col("trip_duration") > 0)

//Vitesse moyenne (km/h): 1 mile = 1.60934 km
val taxiSpeedDF = taxiValidDF.withColumn(
  "average_speed",
  (col("trip_distance") * 1.60934) / (col("trip_duration") / 60)
)

// Heure de la journée et jour de la semaine
val taxiTimeFeaturesDF = taxiSpeedDF
  .withColumn("hour", hour(col("pickup_ts")))
  .withColumn("day_of_week", date_format(col("pickup_ts"), "E"))



taxiDurationDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 21 more fields]
taxiValidDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 21 more fields]
taxiSpeedDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 22 more fields]
taxiTimeFeaturesDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 24 more fields]


[VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 24 more fields]

In [11]:
// Catégorisation des trajets:
val taxiCategorizedDF = taxiTimeFeaturesDF.withColumn(
  "trip_type",
  when(col("trip_distance") * 1.60934 < 10, "short_trip")
    .otherwise("long_trip")
)


taxiCategorizedDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 25 more fields]


[VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 25 more fields]

In [12]:
// Distribution des distances et durées
taxiCategorizedDF.select("trip_distance", "trip_duration")
  .summary("count", "mean", "min", "max")
  .show()


+-------+------------------+--------------------+
|summary|     trip_distance|       trip_duration|
+-------+------------------+--------------------+
|  count|           3645537|             3645537|
|   mean|5.9212328581489615|  18.714519534433776|
|    min|              0.01|0.016666666666666666|
|    max|         231603.73|              5438.8|
+-------+------------------+--------------------+



**Interpretations:**

- Nombre de trajets après nettoyage : 3 645 537

Observation : Environ 500 000 trajets ont été supprimés lors du nettoyage, correspondant aux distances nulles ou aux tarifs négatifs.

→ Cela montre que le nettoyage a été nécessaire pour garantir la fiabilité des analyses.

- Distance moyenne : 5,9 miles → la majorité des trajets sont relativement courts, typique du trafic urbain new-yorkais.

- Durée moyenne : ~19 minutes → cohérente avec les distances observées.

- Valeurs extrêmes très élevées (distance > 200 000 miles et durée > 5 400 minutes)

→ probablement des outliers ou erreurs résiduelles. Ces cas sont rares mais peuvent fausser certaines analyses statistiques si non traités.

In [13]:
// Proportion de trajets courts vs longs:
val tripDistribution = taxiCategorizedDF
  .groupBy("trip_type")
  .count()

tripDistribution.show()


+----------+-------+
| trip_type|  count|
+----------+-------+
| long_trip| 560167|
|short_trip|3085370|
+----------+-------+



tripDistribution = [trip_type: string, count: bigint]


[trip_type: string, count: bigint]

**Interprétation :**

- ~85% des trajets sont courts (<10 km)

- ~15% sont longs (≥10 km)

→ Les trajets longs sont minoritaires mais significatifs pour étudier des comportements différents (ex. tarifs aéroport, covoiturage longue distance).

In [14]:
//Vitesses moyennes selon l’heure de la journée:
taxiCategorizedDF
  .groupBy("hour")
  .agg(avg("average_speed").alias("avg_speed_kmh"))
  .orderBy("hour")
  .show()


+----+------------------+
|hour|     avg_speed_kmh|
+----+------------------+
|   0|  36.4122066066946|
|   1|43.978351358497804|
|   2|24.899649157560855|
|   3|24.862497202198398|
|   4|147.75225396108019|
|   5|63.427353694195745|
|   6| 90.16784388947833|
|   7| 94.88026967746542|
|   8| 46.13608199339781|
|   9| 33.33180952067568|
|  10|  30.2702945524287|
|  11|26.286667484263663|
|  12| 29.29229682921085|
|  13| 19.41275671690131|
|  14| 27.40970201872671|
|  15| 38.32790821757619|
|  16|15.964023392571915|
|  17| 32.10978234636955|
|  18|26.072458781817424|
|  19|29.961285843842088|
+----+------------------+
only showing top 20 rows



**Interprétation :**

- Les vitesses sont plus basses aux heures de pointe (ex. 2–3h du matin relativement basses mais anomalies possibles à 4h).

- Les valeurs extrêmes (147 km/h à 4h) indiquent outliers ou trajets mal calculés (durée très courte et distance longue).

- Les vitesses nocturnes (0–5h) sont globalement plus élevées, ce qui correspond à un trafic plus fluide.

- Les vitesses matinales 6–9h sont élevées sur certaines données mais doivent être interprétées avec prudence à cause des outliers.

In [15]:
// Vitesses moyennes selon le jour de la semaine:
taxiCategorizedDF
  .groupBy("day_of_week")
  .agg(avg("average_speed").alias("avg_speed_kmh"))
  .show()


+-----------+------------------+
|day_of_week|     avg_speed_kmh|
+-----------+------------------+
|        Sun|31.251989996569428|
|        Mon| 38.47408613725878|
|        Thu| 29.13318651216655|
|        Sat| 36.98090570723452|
|        Wed| 39.88984226879196|
|        Fri|29.665435276732563|
|        Tue|22.736722991909428|
+-----------+------------------+



**Interprétation :**

- Les jours de semaine (Mon, Wed) présentent des vitesses moyennes plus élevées pour certains trajets, mais la variabilité reste importante.

- Mardi et jeudi semblent plus lents, probablement en raison du trafic dense ou d’anomalies dans les données.

- Week-end (Sat, Sun) : vitesses moyennes légèrement inférieures au mercredi, mais trafic global plus irrégulier.

**Observations générales Phase 2**

- Nettoyage efficace : suppression des trajets aberrants et préparation des variables dérivées (trip_duration, average_speed, hour, day_of_week, trip_type).

- Dominance des trajets courts : le trafic urbain est concentré sur des distances <10 km.

- Outliers persistants : certaines vitesses et durées extrêmes nécessiteront peut-être un filtrage supplémentaire pour les analyses avancées.

- Tendances temporelles : vitesse variable selon l’heure et le jour → insights potentiels pour optimisation du trafic et planification du covoiturage.

### Phase 3 : Analyse spatio-temporelle

**Questions à traiter :**

- Quelles zones présentent le plus de départs et d’arrivées?

- Quelles sont les heures de pointe pour les taxis?

- Quels sont les 3 principaux points de départ et d’arrivée pour les trajets courts etlongs?

In [21]:
val zonesDF = spark.read
  .option("header", "true")       
  .option("inferSchema", "true")  
  .csv("taxi_zone_lookup.csv")

taxiDF.printSchema()
taxiDF.show(5)


root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)

+--------+--------------------+---------------------+---------------+------

zonesDF = [LocationID: int, Borough: string ... 2 more fields]


[LocationID: int, Borough: string ... 2 more fields]

In [22]:
// Zones avec le plus de départs et d’arrivées:

import org.apache.spark.sql.functions._

// Zones de départ (PULocationID)
val taxiWithPUZoneDF = taxiCategorizedDF.join(
  zonesDF.withColumnRenamed("LocationID", "PULocationID_Zone"),
  taxiCategorizedDF("PULocationID") === col("PULocationID_Zone"),
  "left"
).withColumnRenamed("Zone", "pickup_zone").drop("PULocationID_Zone")

// Zones d'arrivée (DOLocationID)
val taxiWithZonesDF = taxiWithPUZoneDF.join(
  zonesDF.withColumnRenamed("LocationID", "DOLocationID_Zone"),
  taxiWithPUZoneDF("DOLocationID") === col("DOLocationID_Zone"),
  "left"
).withColumnRenamed("Zone", "dropoff_zone").drop("DOLocationID_Zone")

// Top 10 zones de départ
val topPickupZones = taxiWithZonesDF.groupBy("pickup_zone").count()
  .orderBy(desc("count"))
topPickupZones.show(10)

// Top 10 zones d'arrivée
val topDropoffZones = taxiWithZonesDF.groupBy("dropoff_zone").count()
  .orderBy(desc("count"))
topDropoffZones.show(10)

+--------------------+------+
|         pickup_zone| count|
+--------------------+------+
|Upper East Side S...|181326|
|      Midtown Center|169425|
|Upper East Side N...|154944|
|         JFK Airport|153226|
|Penn Station/Madi...|120663|
|        Midtown East|119180|
|Times Sq/Theatre ...|114395|
| Lincoln Square East|112604|
|   LaGuardia Airport|102465|
|       Midtown North| 99923|
+--------------------+------+
only showing top 10 rows

+--------------------+------+
|        dropoff_zone| count|
+--------------------+------+
|Upper East Side S...|164757|
|Upper East Side N...|160677|
|      Midtown Center|135658|
|Times Sq/Theatre ...|106739|
|        Midtown East| 99033|
| Lincoln Square East| 98226|
|         Murray Hill| 97936|
|Upper West Side S...| 97431|
|        East Chelsea| 93632|
|     Lenox Hill West| 92304|
+--------------------+------+
only showing top 10 rows



taxiWithPUZoneDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 28 more fields]
taxiWithZonesDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 31 more fields]
topPickupZones = [pickup_zone: string, count: bigint]
topDropoffZones = [dropoff_zone: string, count: bigint]


[dropoff_zone: string, count: bigint]

**Interprétation :**

- Les quartiers centraux de Manhattan dominent le trafic urbain, confirmant leur rôle comme zones d’activités, commerces et tourisme.

- Les aéroports (JFK, LaGuardia) apparaissent surtout pour les trajets longs, ce qui reflète la demande pour les transferts vers/depuis Manhattan.

In [23]:
//  Heures de pointe

val taxiWithHourDF = taxiWithZonesDF.withColumn("hour", hour(col("tpep_pickup_datetime")))

// Nombre de trajets par heure
val tripsPerHour = taxiWithHourDF.groupBy("hour").count().orderBy("hour")
tripsPerHour.show(24)



+----+------+
|hour| count|
+----+------+
|   0|108073|
|   1| 75679|
|   2| 48013|
|   3| 34401|
|   4| 29954|
|   5| 29754|
|   6| 56726|
|   7|104617|
|   8|137193|
|   9|152095|
|  10|166420|
|  11|180505|
|  12|196417|
|  13|205247|
|  14|218700|
|  15|229285|
|  16|224075|
|  17|238247|
|  18|242858|
|  19|218367|
|  20|206524|
|  21|209281|
|  22|189688|
|  23|143418|
+----+------+



taxiWithHourDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 31 more fields]
tripsPerHour = [hour: int, count: bigint]


[hour: int, count: bigint]

**Interprétation :**

- Matin (7–9h) : pic classique lié aux trajets domicile → travail.

- Après-midi/soir (14–19h) : pic plus marqué, probablement lié aux retours du travail, aux activités commerciales et touristiques.

- Minuit–5h : trafic faible, correspondant aux heures creuses.

In [24]:
// Top 3 points de départ et d’arrivée pour trajets courts et longs

val shortTripsDF = taxiWithZonesDF.filter(col("trip_type") === "short_trip")
val longTripsDF = taxiWithZonesDF.filter(col("trip_type") === "long_trip")

// Trajets courts
val topShortPickups = shortTripsDF.groupBy("pickup_zone").count()
  .orderBy(desc("count")).limit(3)
val topShortDropoffs = shortTripsDF.groupBy("dropoff_zone").count()
  .orderBy(desc("count")).limit(3)

topShortPickups.show()
topShortDropoffs.show()

// Trajets longs
val topLongPickups = longTripsDF.groupBy("pickup_zone").count()
  .orderBy(desc("count")).limit(3)
val topLongDropoffs = longTripsDF.groupBy("dropoff_zone").count()
  .orderBy(desc("count")).limit(3)

topLongPickups.show()
topLongDropoffs.show()


+--------------------+------+
|         pickup_zone| count|
+--------------------+------+
|Upper East Side S...|174675|
|      Midtown Center|158147|
|Upper East Side N...|148143|
+--------------------+------+

+--------------------+------+
|        dropoff_zone| count|
+--------------------+------+
|Upper East Side S...|158550|
|Upper East Side N...|153103|
|      Midtown Center|125823|
+--------------------+------+

+--------------------+------+
|         pickup_zone| count|
+--------------------+------+
|         JFK Airport|137987|
|   LaGuardia Airport| 94376|
|Times Sq/Theatre ...| 12959|
+--------------------+------+

+--------------------+-----+
|        dropoff_zone|count|
+--------------------+-----+
|   LaGuardia Airport|33536|
|         JFK Airport|25874|
|Times Sq/Theatre ...|16108|
+--------------------+-----+



shortTripsDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 31 more fields]
longTripsDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 31 more fields]
topShortPickups = [pickup_zone: string, count: bigint]
topShortDropoffs = [dropoff_zone: string, count: bigint]
topLongPickups = [pickup_zone: string, count: bigint]
topLongDropoffs = [dropoff_zone: string, count: bigint]


[dropoff_zone: string, count: bigint]

**Interprétation :**

- Les trajets longs concernent surtout les transferts entre Manhattan et les aéroports ou les trajets inter-quartiers très longs.

- Confirme que la demande longue distance est concentrée sur les zones périphériques et les hubs de transport.

**Observations générales**

- Manhattan central reste le cœur du trafic urbain.

- Les aéroports influencent fortement les trajets longs.

- Les pics horaires correspondent aux habitudes de travail et aux loisirs, avec une demande accrue en fin d’après-midi et début de soirée.

- Ces informations permettent de :

    - planifier le dispatch des taxis,
    
    - identifier les zones à forte demande pour le covoiturage ou services premium,
    
    - améliorer la gestion du trafic et la planification des routes optimisées.

### Phase 4 : Analyse des modes de paiement

**Questions à traiter :**

- Comment les passagers paient-ils les trajets courts et longs?

- L’utilisation des modes de paiement évolue-t-elle dans le temps?

- Ya-t-il une relation entre le montant total et le mode de paiement?


In [25]:
//Analyse par type de paiement pour trajets courts et longs
// Comptage du nombre de trajets par type de paiement et par type de trajet
val paymentByTripTypeDF = taxiCategorizedDF.groupBy("trip_type", "payment_type")
  .count()
  .orderBy("trip_type", "payment_type")

paymentByTripTypeDF.show()


+----------+------------+-------+
| trip_type|payment_type|  count|
+----------+------------+-------+
| long_trip|           0| 108186|
| long_trip|           1| 390627|
| long_trip|           2|  52286|
| long_trip|           3|   1748|
| long_trip|           4|   7320|
|short_trip|           0| 516130|
|short_trip|           1|2238432|
|short_trip|           2| 295284|
|short_trip|           3|   9463|
|short_trip|           4|  26061|
+----------+------------+-------+



paymentByTripTypeDF = [trip_type: string, payment_type: bigint ... 1 more field]


[trip_type: string, payment_type: bigint ... 1 more field]

**Interpretations:**
- Les passagers préfèrent la commodité des paiements électroniques pour tous les trajets, mais encore un nombre non négligeable paye en espèces, surtout pour de petits trajets.

- Les trajets longs sont moins souvent payés en espèces que les trajets courts, probablement pour des raisons de montant plus élevé.


In [26]:
// Évolution dans le temps
// Extraire le mois et l'année
val taxiWithDateDF = taxiCategorizedDF.withColumn("year_month", date_format(col("tpep_pickup_datetime"), "yyyy-MM"))

// Comptage des paiements par mois
val paymentOverTimeDF = taxiWithDateDF.groupBy("year_month", "payment_type")
  .count()
  .orderBy("year_month")

paymentOverTimeDF.show()


+----------+------------+-------+
|year_month|payment_type|  count|
+----------+------------+-------+
|   2008-12|           2|      1|
|   2009-01|           2|      2|
|   2025-10|           1|     20|
|   2025-10|           2|      1|
|   2025-11|           1|2629039|
|   2025-11|           3|  11211|
|   2025-11|           2| 347566|
|   2025-11|           4|  33381|
|   2025-11|           0| 624316|
+----------+------------+-------+



taxiWithDateDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 26 more fields]
paymentOverTimeDF = [year_month: string, payment_type: bigint ... 1 more field]


[year_month: string, payment_type: bigint ... 1 more field]

**Observations :**

- La tendance pour novembre 2025 montre un usage très élevé du mode 1 et un paiement en espèces (0) plus modéré.

- Les autres méthodes (2, 3, 4) restent minoritaires.

**Interprétation :**

- Le mode principal de paiement reste constant sur le temps récent, ce qui suggère que l’évolution des habitudes de paiement est stable, avec une nette dominance des cartes.

- Les modes alternatifs sont utilisés pour un petit segment de clients ou des trajets particuliers.

In [27]:
// Relation entre montant total et mode de paiement
// Moyenne et médiane du montant total par mode de paiement
val amountByPaymentDF = taxiCategorizedDF.groupBy("payment_type")
  .agg(
    mean("total_amount").alias("avg_total_amount"),
    expr("percentile_approx(total_amount, 0.5)").alias("median_total_amount")
  )

amountByPaymentDF.show()


+------------+------------------+-------------------+
|payment_type|  avg_total_amount|median_total_amount|
+------------+------------------+-------------------+
|           1|30.427701143256034|              22.75|
|           3|26.231469984836334|               17.1|
|           2| 25.50867678453172|              18.25|
|           4|30.300562295917057|              19.65|
|           0|21.817728105638736|              19.72|
+------------+------------------+-------------------+



amountByPaymentDF = [payment_type: bigint, avg_total_amount: double ... 1 more field]


[payment_type: bigint, avg_total_amount: double ... 1 more field]

**Observations :**

- Les cartes (1) et certains modes alternatifs (4) sont associés à des montants plus élevés, en moyenne.

- Les paiements en espèces (0) ont la moyenne la plus faible, même si la médiane est proche de celle des autres modes.

- La différence entre moyenne et médiane suggère que certains paiements élevés font augmenter la moyenne, surtout pour les modes électroniques.

**Interprétation :**

- Les trajets plus coûteux sont souvent réglés par carte ou autres modes électroniques, ce qui est logique pour la commodité et la sécurité.

- Les trajets courts ou moins chers sont souvent payés en espèces, expliquant la moyenne plus basse pour le mode 0.

**Résumé général Phase 4 :**

- La majorité des paiements se fait via carte (mode 1), surtout pour les longs trajets.

- Le paiement en espèces reste important pour les trajets courts, mais minoritaire pour les longs trajets.

- L’usage des modes de paiement est stable dans le temps récent.

- Les montants élevés sont majoritairement réglés par carte ou modes électroniques, tandis que les espèces sont associées à des trajets de moindre valeur.

In [23]:
paymentByTripTypeDF.coalesce(1).write.option("header", "true").csv("output/payment_by_trip_type.csv")
paymentOverTimeDF.coalesce(1).write.option("header", "true").csv("output/payment_over_time.csv")
amountByPaymentDF.coalesce(1).write.option("header", "true").csv("output/amount_by_payment.csv")


In [24]:
// Exporter pour Python
taxiCategorizedDF
  .select("payment_type", "total_amount")
  .write
  .option("header", "true")
  .csv("output/total_amount_by_payment.csv")


### Phase 5 : Exploration du covoiturage (Ride-Sharing)

**Questions à traiter :**

- Peut-on regrouper des trajets courts ayant des départs proches dans le temps et l’espace?

- Quelle économie de temps ou d’argent cela pourrait-elle générer?

In [28]:
// Identifier les trajets courts
val shortTripsDF = taxiCategorizedDF.filter(col("trip_type") === "short_trip")


shortTripsDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 25 more fields]


[VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 25 more fields]

In [32]:
/* Regrouper les trajets proches dans le temps et l’espace

Proximité dans l’espace : trajets ayant des PULocationID proches et/ou des DOLocationID proches.*/
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._


// Fenêtre par zones + ordre temporel
val timeWindow = Window
  .partitionBy("PULocationID", "DOLocationID")
  .orderBy("pickup_ts")   // IMPORTANT : utiliser pickup_ts (timestamp propre)

// Calcul du trajet précédent et de la différence de temps
val shortTripsWithLagDF = shortTripsDF
  .withColumn("prev_pickup", lag("pickup_ts", 1).over(timeWindow))
  .withColumn(
    "time_diff_min",
    (unix_timestamp(col("pickup_ts")) - unix_timestamp(col("prev_pickup"))) / 60
  )


lastException = null
timeWindow = org.apache.spark.sql.expressions.WindowSpec@73251628
shortTripsWithLagDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 27 more fields]


[VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 27 more fields]

In [33]:
shortTripsWithLagDF
  .select("PULocationID", "DOLocationID", "pickup_ts", "prev_pickup", "time_diff_min")
  .show(10, false)


+------------+------------+-------------------+-------------------+------------------+
|PULocationID|DOLocationID|pickup_ts          |prev_pickup        |time_diff_min     |
+------------+------------+-------------------+-------------------+------------------+
|3           |81          |2025-11-03 17:59:12|NULL               |NULL              |
|3           |81          |2025-11-17 08:21:00|2025-11-03 17:59:12|19581.8           |
|3           |81          |2025-11-17 17:18:45|2025-11-17 08:21:00|537.75            |
|3           |81          |2025-11-17 23:09:37|2025-11-17 17:18:45|350.8666666666667 |
|3           |81          |2025-11-23 09:05:52|2025-11-17 23:09:37|7796.25           |
|3           |81          |2025-11-23 22:32:04|2025-11-23 09:05:52|806.2             |
|3           |81          |2025-11-27 22:02:15|2025-11-23 22:32:04|5730.183333333333 |
|4           |158         |2025-11-01 00:39:37|NULL               |NULL              |
|4           |158         |2025-11-01 01:30

In [34]:
// Identification des trajets partageables:
val shareableTripsDF = shortTripsWithLagDF
  .filter(col("time_diff_min").isNotNull && col("time_diff_min") <= 10)

shareableTripsDF.select(
  "PULocationID",
  "DOLocationID",
  "pickup_ts",
  "prev_pickup",
  "time_diff_min",
  "total_amount",
  "trip_duration"
).show(10, false)


+------------+------------+-------------------+-------------------+------------------+------------+------------------+
|PULocationID|DOLocationID|pickup_ts          |prev_pickup        |time_diff_min     |total_amount|trip_duration     |
+------------+------------+-------------------+-------------------+------------------+------------+------------------+
|4           |158         |2025-11-06 23:02:51|2025-11-06 23:00:18|2.55              |22.95       |13.616666666666667|
|4           |158         |2025-11-08 01:49:52|2025-11-08 01:40:04|9.8               |22.46       |17.633333333333333|
|4           |158         |2025-11-08 01:53:12|2025-11-08 01:49:52|3.3333333333333335|21.35       |14.833333333333334|
|4           |158         |2025-11-09 01:55:51|2025-11-09 01:46:41|9.166666666666666 |25.11       |11.866666666666667|
|4           |158         |2025-11-15 00:49:44|2025-11-15 00:40:30|9.233333333333333 |22.26       |11.583333333333334|
|4           |158         |2025-11-15 01:14:06|2

shareableTripsDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 27 more fields]


[VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 27 more fields]

**Interpretations**:

L’analyse des trajets courts a montré que de nombreux déplacements partagent les mêmes zones de départ (PULocationID) et d’arrivée (DOLocationID) avec des temps de départ très rapprochés.

Les exemples observés indiquent des écarts temporels souvent inférieurs à 10 minutes, parfois même 2 à 4 minutes, ce qui constitue une condition réaliste pour le covoiturage urbain.
Cela suggère que, dans un contexte réel, une plateforme intelligente pourrait proposer un regroupement de passagers sans dégrader significativement leur expérience.

In [35]:
// Quantification du potentiel de covoiturage:

// Nombre total de trajets courts
val totalShortTrips = shortTripsDF.count()

// Nombre de trajets partageables
val totalShareableTrips = shareableTripsDF.count()

// Taux de trajets partageables
val shareableRate =
  totalShareableTrips.toDouble / totalShortTrips * 100

println(s"Taux de trajets courts partageables : ${shareableRate} %")

Taux de trajets courts partageables : 49.076934046808 %


totalShortTrips = 3085370
totalShareableTrips = 1514205
shareableRate = 49.076934046808


49.076934046808

**Interpretations :**

Les résultats globaux sont particulièrement significatifs :

- Nombre total de trajets courts :
3 085 370

- Nombre de trajets partageables :
1 514 205

- Proportion de trajets partageables :
≈ 49,1 %

Cela signifie qu’près d’un trajet court sur deux pourrait potentiellement être mutualisé.

Ce taux élevé indique que le covoiturage ne serait pas un phénomène marginal, mais pourrait concerner une part substantielle du trafic taxi, en particulier dans les zones urbaines denses.

In [36]:
// Estimation des économies financières

//Hypothèse simple : 2 passagers partagent le coût
val costSavingsDF = shareableTripsDF
  .withColumn("shared_cost", col("total_amount") / 2)
  .withColumn("money_saved", col("total_amount") - col("shared_cost"))

costSavingsDF.select(
  "total_amount",
  "shared_cost",
  "money_saved"
).show(10, false)

// Économie moyenne par trajet
val avgMoneySavedDF = costSavingsDF
  .agg(avg("money_saved").alias("avg_money_saved"))

avgMoneySavedDF.show()

+------------+-----------+-----------+
|total_amount|shared_cost|money_saved|
+------------+-----------+-----------+
|22.95       |11.475     |11.475     |
|22.46       |11.23      |11.23      |
|21.35       |10.675     |10.675     |
|25.11       |12.555     |12.555     |
|22.26       |11.13      |11.13      |
|24.78       |12.39      |12.39      |
|20.95       |10.475     |10.475     |
|20.77       |10.385     |10.385     |
|20.42       |10.21      |10.21      |
|21.35       |10.675     |10.675     |
+------------+-----------+-----------+
only showing top 10 rows

+------------------+
|   avg_money_saved|
+------------------+
|10.218844248301655|
+------------------+



costSavingsDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 29 more fields]
avgMoneySavedDF = [avg_money_saved: double]


[avg_money_saved: double]

**Interpretations:**

En supposant un scénario simple où deux passagers partagent un trajet, les résultats montrent :

- Économie moyenne par passager :
≈ 10,22 unités monétaires par trajet

Les montants observés (souvent entre 10 et 12 unités) sont cohérents avec les tarifs des trajets courts et représentent une réduction significative du coût individuel.


- Le covoiturage rend les trajets plus accessibles financièrement

- Il peut attirer des passagers sensibles au prix

- Il améliore le taux de remplissage des taxis sans augmenter les distances parcourues

In [37]:
// Estimation des économies de temps

// Hypothèse : un trajet partagé évite un second déplacement complet
val timeSavingsDF = shareableTripsDF
  .withColumn("time_saved_min", col("trip_duration"))

timeSavingsDF.select("trip_duration", "time_saved_min").show(10)

// Temps moyen économisé
val avgTimeSavedDF = timeSavingsDF
  .agg(avg("time_saved_min").alias("avg_time_saved_min"))

avgTimeSavedDF.show()

+------------------+------------------+
|     trip_duration|    time_saved_min|
+------------------+------------------+
|13.616666666666667|13.616666666666667|
|17.633333333333333|17.633333333333333|
|14.833333333333334|14.833333333333334|
|11.866666666666667|11.866666666666667|
|11.583333333333334|11.583333333333334|
|              14.7|              14.7|
|13.766666666666667|13.766666666666667|
|              15.0|              15.0|
|             10.55|             10.55|
|15.466666666666667|15.466666666666667|
+------------------+------------------+
only showing top 10 rows

+------------------+
|avg_time_saved_min|
+------------------+
|12.849298817971613|
+------------------+



timeSavingsDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 28 more fields]
avgTimeSavedDF = [avg_time_saved_min: double]


[avg_time_saved_min: double]

**Interpretations:**

L’analyse du temps économisé repose sur l’idée qu’un trajet partagé évite un second déplacement équivalent.

- Durée moyenne des trajets partageables :
~ 12 à 15 minutes

- Temps moyen économisé par mutualisation :
≈ 12,85 minutes

À l’échelle du système :

- Moins de trajets redondants

- Réduction de la congestion

- Diminution du temps total passé sur la route par les taxis

Cela a également un impact environnemental positif, en réduisant les émissions liées aux déplacements urbains.

In [38]:
// Zones les plus favorables au covoiturage
val topRideSharingZonesDF = shareableTripsDF
  .groupBy("PULocationID", "DOLocationID")
  .count()
  .orderBy(desc("count"))

topRideSharingZonesDF.show(10)

+------------+------------+-----+
|PULocationID|DOLocationID|count|
+------------+------------+-----+
|         237|         236|26724|
|         236|         237|23030|
|         237|         237|18239|
|         236|         236|15794|
|         161|         237|12385|
|         237|         161|11196|
|         161|         236| 9364|
|         237|         162| 8256|
|         142|         239| 7986|
|         239|         238| 7885|
+------------+------------+-----+
only showing top 10 rows



topRideSharingZonesDF = [PULocationID: int, DOLocationID: int ... 1 more field]


[PULocationID: int, DOLocationID: int ... 1 more field]

**Interpretations :**

Les paires de zones les plus fréquentes (ex. 237 ↔ 236, 161 ↔ 237) concentrent un grand nombre de trajets partageables.

Cela suggère :

- Des axes urbains très fréquentés

- Probablement des zones résidentielles ↔ zones d’activité

- Des corridors idéaux pour déployer en priorité des services de ride-sharing

Une stratégie ciblée sur ces zones permettrait de maximiser rapidement les bénéfices du covoiturage.

**Synthèse globale :**

Les résultats montrent que le covoiturage appliqué aux trajets courts présente un potentiel élevé et réaliste.
Près de la moitié des trajets analysés pourraient être mutualisés, générant des économies financières significatives pour les passagers et une réduction notable du temps et du trafic urbain.
Une implémentation intelligente, ciblant les zones les plus actives et les départs proches dans le temps, pourrait transformer durablement la mobilité urbaine.

### Extension à choisir

Vous pouvez aller au-delà des analyses de base et proposer des extensions permettant de valoriser davantage les données. L’objectif est de choisir au moins une extension parmi celles listées ci-dessous et de l’implémenter en utilisant les fonctionnalités avancées d’Apache Spark et/ou des bibliothèques associées.

**Extensions proposées**

- Analyse avancée et transformations :

    - Calcul des tendances horaires ou journalières des trajets.

    - Détection des anomalies (trajets très longs ou très courts).

    - Catégorisation des trajets par distance, tarif ou durée.

- Feature Engineering pour Machine Learning :

    - Création de nouvelles colonnes telles que average_speed, tip_percentage, fréquence de trajets par zone.
    
    - Encodage des informations temporelles (jour de la semaine, heure, périodes de pointe) pour des modèles prédictifs

- Modélisation prédictive :
    - Prédiction du tarif ou de la durée d’un trajet.
      
    - Prédiction de la demande par zone et par période.
 
    - Segmentation des trajets ou clients via clustering (ex. KMeans).
 
- Optimisation et ride-sharing :

    - Identification de trajets courts proches dans le temps et l’espace pour proposer du covoiturage.
    - Calcul des gains potentiels en temps et en coût.

- Visualisations avancées :
    - Cartes heatmaps des zones de départ et d’arrivée.
    - Graphiques temporels pour la demande, les revenus ou les pourboires.
    - Dashboard interactif avec Databricks, Plotly ou Folium.
- Extension Big Data / temps réel :
    - Intégration de données externes (météo, trafic, événements).
    - Transformation du pipeline batch en un pipeline streaming pour analyser la demande dynamique en quasi temps réel.
**Remarque** : Il faut choisir au moins une extension et justifier les choix techniques et méthodologiques dans le rapport final.

### Extension choisie : Optimisation & Ride-Sharing + Détection d’anomalies (Analyse avancée)

Cette extension s’appuie directement sur les phases précédentes, en particulier :

- l’analyse spatio-temporelle (Phase 3),

- l’analyse économique (Phase 4),

- et l’exploration du covoiturage (Phase 5).

Elle permet donc de valoriser fortement les données sans repartir de zéro.

**Extension 1 : Optimisation & Ride-Sharing (approfondie)**

**Objectif**

- Aller au-delà de l’identification des trajets partageables pour :

    - quantifier précisément les gains (temps et coût),
    
    - identifier les zones prioritaires pour le covoiturage,
    
    - proposer une logique d’optimisation réaliste pour un service de taxi.

**Méthodologie**

- Utilisation de Window Functions Spark (lag, partitionBy, orderBy)

- Regroupement des trajets courts ayant :

    - même PULocationID et DOLocationID,
    
    - un écart temporel inférieur à un seuil (ex. 10 minutes)

- Estimation :
    
    - du taux de trajets partageables,
    
    - de l’économie moyenne par trajet,
    
    - du temps économisé à l’échelle du système

**Justification technique**

- Spark est particulièrement adapté pour :

    - les calculs sur grands volumes,
    
    - les analyses temporelles complexes,
    
    - les agrégations massives par zones.

- Les fenêtres analytiques permettent une approche scalable, impossible à réaliser efficacement avec des outils classiques.

**Apport pour le projet**

- Donne une dimension décisionnelle (où et quand proposer le covoiturage)

- Introduit une logique d’optimisation réelle, proche des plateformes modernes (Uber Pool, Lyft Shared)

- Montre une maîtrise avancée de Spark SQL

**Extension 2 : Détection des anomalies de trajets**

**Objectif**

- Identifier des trajets atypiques pouvant indiquer :

    - erreurs de saisie,
    
    - comportements anormaux,
    
    - conditions exceptionnelles (trafic, incidents).

**Méthodologie**

- Analyse des distributions de :

    - trip_duration
    
    - trip_distance
    
    - average_speed

- Détection de trajets :

    - très courts (durée proche de 0),
    
    - très longs (durée ou distance extrême),
    
    - vitesses irréalistes (ex. > 120 km/h en zone urbaine)

- Utilisation de :
    
    - quantiles (percentile_approx)
    
    - règles statistiques simples (IQR, seuils métier)

**Justification technique**

- Les anomalies sont difficiles à détecter sans traitement massif

- Spark permet :

    - une analyse globale robuste,
    
    - une détection cohérente même sur des millions de lignes

- Approche interprétable (contrairement à certains modèles black-box)

**Apport pour le projet**

- Améliore la qualité des données

- Renforce la crédibilité des analyses

- Prépare les données pour une éventuelle modélisation prédictive

**Pourquoi ces extensions plutôt qu’une autre ?**

- Elles réutilisent tout le travail existant
- Elles sont réalistes et applicables dans un contexte professionnel
- Elles démontrent :



## EXTENSION 1 — Détection des anomalies de trajets :

**Objectif**

Identifier les trajets anormalement courts ou longs.

In [40]:
import org.apache.spark.sql.functions._

// Calcul des quantiles
val durationQuantiles = taxiCategorizedDF.stat.approxQuantile(
  "trip_duration",
  Array(0.01, 0.99),
  0.01
)

val minDuration = durationQuantiles(0)
val maxDuration = durationQuantiles(1)

// Détection des anomalies
val durationAnomaliesDF = taxiCategorizedDF.filter(
  col("trip_duration") < minDuration || col("trip_duration") > maxDuration
)
val cleanShortTripsDF = taxiCategorizedDF.filter(
  col("trip_duration") >= minDuration && col("trip_duration") <= maxDuration
)
durationAnomaliesDF.select(
  "trip_duration",
  "trip_distance",
  "average_speed"
).show(10)



+-------------+-------------+-------------+
|trip_duration|trip_distance|average_speed|
+-------------+-------------+-------------+
+-------------+-------------+-------------+



durationQuantiles = Array(0.016666666666666666, 5438.8)
minDuration = 0.016666666666666666
maxDuration = 5438.8
durationAnomaliesDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 25 more fields]
cleanShortTripsDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 25 more fields]


[VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 25 more fields]

In [41]:
val speedAnomaliesDF = taxiCategorizedDF.filter(
  col("average_speed") > 120 || col("average_speed") < 2
)

speedAnomaliesDF.select(
  "average_speed",
  "trip_distance",
  "trip_duration"
).show(10)


+-------------------+-------------+------------------+
|      average_speed|trip_distance|     trip_duration|
+-------------------+-------------+------------------+
| 0.2621549321266968|         0.01| 3.683333333333333|
| 1.8892252173913042|         0.39|19.933333333333334|
| 1.5518635714285713|         0.18|              11.2|
|  1.362137304075235|         0.15|10.633333333333333|
|0.25495616968843515|         3.75|           1420.25|
|  1.720878415841584|         0.12| 6.733333333333333|
| 1.6435812765957447|         0.08|               4.7|
| 1.5022361279170267|          0.3|19.283333333333335|
| 1.3845057352941177|         0.13| 9.066666666666666|
| 1.0345757142857144|         0.08| 7.466666666666667|
+-------------------+-------------+------------------+
only showing top 10 rows



speedAnomaliesDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 25 more fields]


[VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 25 more fields]

## EXTENSION 2 — Optimisation & Ride-Sharing 

In [42]:
//Regroupement spatio-temporel des trajets courts
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

// Fenêtre par zones
val timeWindow = Window
  .partitionBy("PULocationID", "DOLocationID")
  .orderBy(col("pickup_ts"))

// Calcul de l'écart temporel
val shortTripsWithLagDF = shortTripsDF
  .withColumn("prev_pickup", lag("pickup_ts", 1).over(timeWindow))
  .withColumn(
    "time_diff_min",
    (unix_timestamp(col("pickup_ts")) - unix_timestamp(col("prev_pickup"))) / 60
  )

shortTripsWithLagDF.select(
  "PULocationID",
  "DOLocationID",
  "pickup_ts",
  "prev_pickup",
  "time_diff_min"
).show(10)


+------------+------------+-------------------+-------------------+------------------+
|PULocationID|DOLocationID|          pickup_ts|        prev_pickup|     time_diff_min|
+------------+------------+-------------------+-------------------+------------------+
|           3|          81|2025-11-03 17:59:12|               NULL|              NULL|
|           3|          81|2025-11-17 08:21:00|2025-11-03 17:59:12|           19581.8|
|           3|          81|2025-11-17 17:18:45|2025-11-17 08:21:00|            537.75|
|           3|          81|2025-11-17 23:09:37|2025-11-17 17:18:45| 350.8666666666667|
|           3|          81|2025-11-23 09:05:52|2025-11-17 23:09:37|           7796.25|
|           3|          81|2025-11-23 22:32:04|2025-11-23 09:05:52|             806.2|
|           3|          81|2025-11-27 22:02:15|2025-11-23 22:32:04| 5730.183333333333|
|           4|         158|2025-11-01 00:39:37|               NULL|              NULL|
|           4|         158|2025-11-01 01:30

timeWindow = org.apache.spark.sql.expressions.WindowSpec@177e9b13
shortTripsWithLagDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 27 more fields]


[VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 27 more fields]

In [43]:
// Identification des trajets partageables
//Critère
//- même zone de départ et arrivée
//- moins de 10 minutes d’écart
val shareableTripsDF = shortTripsWithLagDF.filter(
  col("time_diff_min").isNotNull && col("time_diff_min") <= 10
)


shareableTripsDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 27 more fields]


[VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 27 more fields]

In [44]:
// Taux de trajets partageables
val totalShortTrips = shortTripsDF.count()
val totalShareableTrips = shareableTripsDF.count()

val shareableRate =
  totalShareableTrips.toDouble / totalShortTrips.toDouble * 100

println(s"Taux de trajets courts partageables : $shareableRate %")

Taux de trajets courts partageables : 49.076934046808 %


totalShortTrips = 3085370
totalShareableTrips = 1514205
shareableRate = 49.076934046808


49.076934046808

In [45]:
// Économie financière estimée
//Hypothèse réaliste :Le coût est divisé par 2 lors du covoiturage.

val costSavingsDF = shareableTripsDF
  .withColumn("shared_cost", col("total_amount") / 2)
  .withColumn("money_saved", col("total_amount") - col("shared_cost"))

costSavingsDF.select(
  "total_amount",
  "shared_cost",
  "money_saved"
).show(10)

//Moyenne économisée
val avgMoneySavedDF = costSavingsDF
  .agg(avg("money_saved").alias("avg_money_saved"))

avgMoneySavedDF.show()


+------------+-----------+-----------+
|total_amount|shared_cost|money_saved|
+------------+-----------+-----------+
|       22.95|     11.475|     11.475|
|       22.46|      11.23|      11.23|
|       21.35|     10.675|     10.675|
|       25.11|     12.555|     12.555|
|       22.26|      11.13|      11.13|
|       24.78|      12.39|      12.39|
|       20.95|     10.475|     10.475|
|       20.77|     10.385|     10.385|
|       20.42|      10.21|      10.21|
|       21.35|     10.675|     10.675|
+------------+-----------+-----------+
only showing top 10 rows

+------------------+
|   avg_money_saved|
+------------------+
|10.218844248301655|
+------------------+



costSavingsDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 29 more fields]
avgMoneySavedDF = [avg_money_saved: double]


[avg_money_saved: double]

In [46]:
//Gain de temps estimé
//Hypothèse simple : un trajet partagé évite un trajet supplémentaire

val timeSavingsDF = shareableTripsDF
  .withColumn("time_saved_min", col("trip_duration"))

val avgTimeSavedDF = timeSavingsDF
  .agg(avg("time_saved_min").alias("avg_time_saved_min"))

avgTimeSavedDF.show()

+------------------+
|avg_time_saved_min|
+------------------+
|12.849298817971613|
+------------------+



timeSavingsDF = [VendorID: int, tpep_pickup_datetime: timestamp_ntz ... 28 more fields]
avgTimeSavedDF = [avg_time_saved_min: double]


[avg_time_saved_min: double]

In [47]:
// Zones les plus favorables au covoiturage
val topRideSharingZonesDF = shareableTripsDF
  .groupBy("PULocationID", "DOLocationID")
  .count()
  .orderBy(desc("count"))

topRideSharingZonesDF.show(10)

+------------+------------+-----+
|PULocationID|DOLocationID|count|
+------------+------------+-----+
|         237|         236|26724|
|         236|         237|23030|
|         237|         237|18239|
|         236|         236|15794|
|         161|         237|12385|
|         237|         161|11196|
|         161|         236| 9364|
|         237|         162| 8256|
|         142|         239| 7986|
|         239|         238| 7885|
+------------+------------+-----+
only showing top 10 rows



topRideSharingZonesDF = [PULocationID: int, DOLocationID: int ... 1 more field]


[PULocationID: int, DOLocationID: int ... 1 more field]