In [2]:
!pip install holidays

Collecting holidays
  Downloading holidays-0.70-py3-none-any.whl (903 kB)
[K     |████████████████████████████████| 903 kB 3.2 MB/s eta 0:00:01
Installing collected packages: holidays
Successfully installed holidays-0.70


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, udf, date_format, to_date, lit, when, count, sum as spark_sum, avg, collect_set, size, split
)
from pyspark.sql.types import StringType, BooleanType
import holidays

# 1. Processing data NYC

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean

# Initialiser une session Spark
spark = SparkSession.builder \
    .appName("Taxi Data Analysis") \
    .getOrCreate()

# Lire le fichier Parquet
data_yellow = spark.read.parquet("data/nyc_yellow_taxi_parquet/yellow_tripdata_2024-01.parquet")

# Afficher un aperçu des données
data_yellow.printSchema()

                                                                                

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



### **Colonnes intéressantes dans `data_yellow`**

| **Colonne**             | **Utilité**                                                                 |
|--------------------------|-----------------------------------------------------------------------------|
| `tpep_pickup_datetime`   | Date et heure de départ, utilisée pour l'analyse temporelle et les tendances. |
| `PULocationID`           | Identifiant de la zone de départ (pour localiser l'emplacement sur la carte). |
| `DOLocationID`           | Identifiant de la zone d'arrivée (facultatif pour cette application).       |
| `trip_distance`          | Distance du trajet (utile pour prédire les tarifs et analyser les trajets). |
| `fare_amount`            | Montant de la course (pour prédire les revenus).                           |
| `tip_amount`             | Montant du pourboire (utile pour des métriques financières).               |
| `total_amount`           | Montant total payé par l'utilisateur (tarif + taxes + pourboires).         |
| `passenger_count`        | Nombre de passagers (utile pour des analyses liées à la demande).          |
| `tpep_dropoff_datetime`  | Date et heure de fin de course (facultatif pour cette application).         |

---

### **Pourquoi ces colonnes ?**

1. **Localisation et Carte Interactive :**
   - Utilisez `PULocationID` pour identifier la position initiale de la course.
   - Vous pouvez convertir ces identifiants en latitude/longitude avec une table de correspondance (disponible sur le site NYC TLC).

2. **Analyse Temporelle :**
   - `tpep_pickup_datetime` permet d'étudier les variations horaires, journalières ou mensuelles de la demande.

3. **Analyse des Trajets et Revenus :**
   - `trip_distance`, `fare_amount`, `tip_amount`, et `total_amount` fournissent des informations clés sur les revenus, les trajets et le comportement des utilisateurs.

4. **Interaction Utilisateur :**
   - `passenger_count` peut être utile pour prédire les tarifs moyens ou pour segmenter les trajets.


In [6]:
# Sélection des colonnes pertinentes
selected_columns = [
    "tpep_pickup_datetime",
    "PULocationID",
    "trip_distance",
    "fare_amount",
    "tip_amount",
    "total_amount",
    "passenger_count"
]

filtered_data = data_yellow.select([col(c) for c in selected_columns])

In [7]:
# Prétraitement des dates
yellow_taxi_df = filtered_data.withColumn("pickup_date", filtered_data['tpep_pickup_datetime'].cast("date"))
yellow_taxi_df = yellow_taxi_df.withColumn("hour", date_format(col("tpep_pickup_datetime"), "HH").cast("int"))
yellow_taxi_df.show(10)

                                                                                

+--------------------+------------+-------------+-----------+----------+------------+---------------+-----------+----+
|tpep_pickup_datetime|PULocationID|trip_distance|fare_amount|tip_amount|total_amount|passenger_count|pickup_date|hour|
+--------------------+------------+-------------+-----------+----------+------------+---------------+-----------+----+
| 2024-01-01 00:57:55|         186|         1.72|       17.7|       0.0|        22.7|              1| 2024-01-01|   0|
| 2024-01-01 00:03:00|         140|          1.8|       10.0|      3.75|       18.75|              1| 2024-01-01|   0|
| 2024-01-01 00:17:06|         236|          4.7|       23.3|       3.0|        31.3|              1| 2024-01-01|   0|
| 2024-01-01 00:36:38|          79|          1.4|       10.0|       2.0|        17.0|              1| 2024-01-01|   0|
| 2024-01-01 00:46:51|         211|          0.8|        7.9|       3.2|        16.1|              1| 2024-01-01|   0|
| 2024-01-01 00:54:08|         148|          4.7

In [8]:
print(f"Nombre total de lignes : {yellow_taxi_df.count()}")



Nombre total de lignes : 2964624


                                                                                

In [9]:
#from pyspark.sql.functions import col, sum

#result_df_new=result_df.select("pickup_date", "trip_distance", "TMAX", "PRCP")

yellow_taxi_df.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in yellow_taxi_df.columns]).show()



+--------------------+------------+-------------+-----------+----------+------------+---------------+-----------+----+
|tpep_pickup_datetime|PULocationID|trip_distance|fare_amount|tip_amount|total_amount|passenger_count|pickup_date|hour|
+--------------------+------------+-------------+-----------+----------+------------+---------------+-----------+----+
|                   0|           0|            0|          0|         0|           0|         140162|          0|   0|
+--------------------+------------+-------------+-----------+----------+------------+---------------+-----------+----+



                                                                                

In [10]:
# Compter le nombre de trajets pour chaque passenger_count
passenger_count_distribution = yellow_taxi_df.groupBy("passenger_count").count()

# Afficher les résultats
passenger_count_distribution.show()



+---------------+-------+
|passenger_count|  count|
+---------------+-------+
|              0|  31465|
|              7|      8|
|              6|  22353|
|              5|  33506|
|              1|2188739|
|              3|  91262|
|              8|     51|
|              2| 405103|
|              4|  51974|
|              9|      1|
|           NULL| 140162|
+---------------+-------+



                                                                                

# 2 Processing weather data

In [12]:
from pyspark.sql import SparkSession

# Initialiser une session Spark
spark = SparkSession.builder \
    .appName("Taxi Data Analysis") \
    .getOrCreate()

# Lire les fichiers csv depuis le répertoire "data"
meteo_df = spark.read.csv("data/nyc_weather_data/nyc_weather_2023_to_today.csv", header=True, inferSchema=True)

meteo_df.show()

meteo_df.printSchema()

                                                                                

+----------+-----------+---------+---------------+----+---------+---------------+----+----+--------+--------------------+---------------+
|      DATE|    STATION|LONGITUDE|PRCP_ATTRIBUTES|TMAX|ELEVATION|TMAX_ATTRIBUTES|TMIN|PRCP|LATITUDE|                NAME|TMIN_ATTRIBUTES|
+----------+-----------+---------+---------------+----+---------+---------------+----+----+--------+--------------------+---------------+
|2023-01-01|USW00094728|-73.96925|      T,,W,2400|12.8|     42.7|            ,,W| 9.4| 0.0|40.77898|NY CITY CENTRAL P...|            ,,W|
|2023-01-02|USW00094728|-73.96925|       ,,W,2400|13.3|     42.7|            ,,W| 9.4| 0.5|40.77898|NY CITY CENTRAL P...|            ,,W|
|2023-01-03|USW00094728|-73.96925|       ,,W,2400|14.4|     42.7|            ,,W| 8.3|10.7|40.77898|NY CITY CENTRAL P...|            ,,W|
|2023-01-04|USW00094728|-73.96925|       ,,W,2400|18.9|     42.7|            ,,W| 9.4| 0.5|40.77898|NY CITY CENTRAL P...|            ,,W|
|2023-01-05|USW00094728|-73.96925|

In [13]:
from pyspark.sql.functions import col

# Filtrer pour garder uniquement les lignes de janvier 2024
meteo_df_01_2024 = meteo_df.filter((col("DATE") >= "2024-01-01") & (col("DATE") <= "2024-01-31"))

# Afficher les résultats
meteo_df_01_2024.show()

+----------+-----------+---------+---------------+----+---------+---------------+----+----+--------+--------------------+---------------+
|      DATE|    STATION|LONGITUDE|PRCP_ATTRIBUTES|TMAX|ELEVATION|TMAX_ATTRIBUTES|TMIN|PRCP|LATITUDE|                NAME|TMIN_ATTRIBUTES|
+----------+-----------+---------+---------------+----+---------+---------------+----+----+--------+--------------------+---------------+
|2024-01-01|USW00094728|-73.96925|       ,,W,2400| 8.3|     42.7|            ,,W| 1.7| 0.8|40.77898|NY CITY CENTRAL P...|            ,,W|
|2024-01-02|USW00094728|-73.96925|       ,,W,2400| 5.6|     42.7|            ,,W|-1.6| 0.0|40.77898|NY CITY CENTRAL P...|            ,,W|
|2024-01-03|USW00094728|-73.96925|       ,,W,2400| 6.1|     42.7|            ,,W| 1.1| 0.0|40.77898|NY CITY CENTRAL P...|            ,,W|
|2024-01-04|USW00094728|-73.96925|       ,,W,2400| 7.2|     42.7|            ,,W|-2.1| 0.0|40.77898|NY CITY CENTRAL P...|            ,,W|
|2024-01-05|USW00094728|-73.96925|

In [14]:
weather_data = meteo_df_01_2024.withColumn("date", to_date(col("DATE")))

# Garder uniquement les colonnes nécessaires
weather_data = weather_data.select("date", "TMAX", "TMIN", "PRCP")
weather_data.show()

+----------+----+----+----+
|      date|TMAX|TMIN|PRCP|
+----------+----+----+----+
|2024-01-01| 8.3| 1.7| 0.8|
|2024-01-02| 5.6|-1.6| 0.0|
|2024-01-03| 6.1| 1.1| 0.0|
|2024-01-04| 7.2|-2.1| 0.0|
|2024-01-05| 2.8|-3.2| 0.0|
|2024-01-06| 3.3|-0.5|10.4|
|2024-01-07| 3.3| 1.1| 6.1|
|2024-01-08| 7.2| 2.2| 0.0|
|2024-01-09|13.9| 2.2|43.9|
|2024-01-10|13.9| 6.7| 5.6|
|2024-01-11| 8.3| 5.0| 0.0|
|2024-01-12|10.0| 4.4| 2.0|
|2024-01-13|15.6| 1.7|20.6|
|2024-01-14| 6.7|-3.2| 0.0|
|2024-01-15|-1.6|-4.9| 1.0|
|2024-01-16| 0.0|-5.5| 7.1|
|2024-01-17|-4.3|-8.2| 0.0|
|2024-01-18| 1.1|-5.5| 0.0|
|2024-01-19| 0.0|-3.2| 1.0|
|2024-01-20|-3.2|-7.7| 0.0|
+----------+----+----+----+
only showing top 20 rows



In [15]:
from pyspark.sql.functions import col, sum

weather_data_new=weather_data.select("date", "TMAX", "TMIN", "PRCP")

weather_data_new.select([sum(col(c).isNull().cast("int")).alias(c) for c in weather_data_new.columns]).show()

+----+----+----+----+
|date|TMAX|TMIN|PRCP|
+----+----+----+----+
|   0|   0|   0|   0|
+----+----+----+----+



In [16]:
weather_data = weather_data.fillna({"TMAX": 0, "TMIN": 0, "PRCP": 0})

In [17]:
from pyspark.sql.functions import col, sum

weather_data_new=weather_data.select("date", "TMAX", "TMIN", "PRCP")

weather_data_new.select([sum(col(c).isNull().cast("int")).alias(c) for c in weather_data_new.columns]).show()

+----+----+----+----+
|date|TMAX|TMIN|PRCP|
+----+----+----+----+
|   0|   0|   0|   0|
+----+----+----+----+



In [18]:
print(f"Nombre total de lignes : {weather_data_new.count()}")

Nombre total de lignes : 93


In [19]:
# Catégoriser la température
def categorize_temp(tmax):
    if tmax is None:
        return "unknown"
    elif tmax < 7:
        return "froid"
    elif 7 <= tmax <= 15:
        return "modéré"
    else:
        return "chaud"

categorize_temp_udf = udf(categorize_temp, StringType())
weather_data_new = weather_data_new.withColumn("temp_cat", categorize_temp_udf(col("TMAX")))

# Catégoriser la météo
weather_data_new = weather_data_new.withColumn(
    "weather", when(col("PRCP") > 0, "rainy").otherwise("clear")
)
weather_data_new.show()

[Stage 26:>                                                         (0 + 1) / 1]

+----------+----+----+----+--------+-------+
|      date|TMAX|TMIN|PRCP|temp_cat|weather|
+----------+----+----+----+--------+-------+
|2024-01-01| 8.3| 1.7| 0.8|  modéré|  rainy|
|2024-01-02| 5.6|-1.6| 0.0|   froid|  clear|
|2024-01-03| 6.1| 1.1| 0.0|   froid|  clear|
|2024-01-04| 7.2|-2.1| 0.0|  modéré|  clear|
|2024-01-05| 2.8|-3.2| 0.0|   froid|  clear|
|2024-01-06| 3.3|-0.5|10.4|   froid|  rainy|
|2024-01-07| 3.3| 1.1| 6.1|   froid|  rainy|
|2024-01-08| 7.2| 2.2| 0.0|  modéré|  clear|
|2024-01-09|13.9| 2.2|43.9|  modéré|  rainy|
|2024-01-10|13.9| 6.7| 5.6|  modéré|  rainy|
|2024-01-11| 8.3| 5.0| 0.0|  modéré|  clear|
|2024-01-12|10.0| 4.4| 2.0|  modéré|  rainy|
|2024-01-13|15.6| 1.7|20.6|   chaud|  rainy|
|2024-01-14| 6.7|-3.2| 0.0|   froid|  clear|
|2024-01-15|-1.6|-4.9| 1.0|   froid|  rainy|
|2024-01-16| 0.0|-5.5| 7.1|   froid|  rainy|
|2024-01-17|-4.3|-8.2| 0.0|   froid|  clear|
|2024-01-18| 1.1|-5.5| 0.0|   froid|  clear|
|2024-01-19| 0.0|-3.2| 1.0|   froid|  rainy|
|2024-01-2

                                                                                

In [20]:
weather_data_new.select([sum(col(c).isNull().cast("int")).alias(c) for c in weather_data_new.columns]).show()

+----+----+----+----+--------+-------+
|date|TMAX|TMIN|PRCP|temp_cat|weather|
+----+----+----+----+--------+-------+
|   0|   0|   0|   0|       0|      0|
+----+----+----+----+--------+-------+



# 3 Joindre les informations

In [22]:
yellow_taxi_df.printSchema()
weather_data_new.printSchema()

root
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- pickup_date: date (nullable = true)
 |-- hour: integer (nullable = true)

root
 |-- date: date (nullable = true)
 |-- TMAX: double (nullable = false)
 |-- TMIN: double (nullable = false)
 |-- PRCP: double (nullable = false)
 |-- temp_cat: string (nullable = true)
 |-- weather: string (nullable = false)



In [23]:
# Renommer la colonne 'date' de weather_data_new pour qu'elle corresponde à 'pickup_date'
weather_data_new = weather_data_new.withColumnRenamed("date", "pickup_date")

# Effectuer la jointure entre yellow_taxi_df et weather_data_new sur 'pickup_date'
merged_df = yellow_taxi_df.join(weather_data_new, on="pickup_date", how="left")

# Afficher le résultat
merged_df.show()

+-----------+--------------------+------------+-------------+-----------+----------+------------+---------------+----+----+----+----+--------+-------+
|pickup_date|tpep_pickup_datetime|PULocationID|trip_distance|fare_amount|tip_amount|total_amount|passenger_count|hour|TMAX|TMIN|PRCP|temp_cat|weather|
+-----------+--------------------+------------+-------------+-----------+----------+------------+---------------+----+----+----+----+--------+-------+
| 2024-01-01| 2024-01-01 00:57:55|         186|         1.72|       17.7|       0.0|        22.7|              1|   0| 8.3| 1.7| 0.0|  modéré|  clear|
| 2024-01-01| 2024-01-01 00:57:55|         186|         1.72|       17.7|       0.0|        22.7|              1|   0| 8.3| 2.2| 0.3|  modéré|  rainy|
| 2024-01-01| 2024-01-01 00:57:55|         186|         1.72|       17.7|       0.0|        22.7|              1|   0| 8.3| 1.7| 0.8|  modéré|  rainy|
| 2024-01-01| 2024-01-01 00:03:00|         140|          1.8|       10.0|      3.75|       18.

In [24]:
merged_df.printSchema()

root
 |-- pickup_date: date (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- hour: integer (nullable = true)
 |-- TMAX: double (nullable = true)
 |-- TMIN: double (nullable = true)
 |-- PRCP: double (nullable = true)
 |-- temp_cat: string (nullable = true)
 |-- weather: string (nullable = true)



# 4. Rajout du champs is_business_day sur les données yellow_taxi

In [25]:
import holidays
import calendar
from datetime import date, datetime

# Given a date, return if its business day
def isWorkDay(date):
    is_work_day = False

    if date.weekday() <= 4:
        is_work_day = True

    us_holidays = holidays.UnitedStates()

    if date in us_holidays:
        is_work_day = False

    return is_work_day


# Given a date and return a list containing all off-day in that month in datetime.date format.
def offDayInMonth(_date):
    month = _date.month
    year = _date.year

    if calendar.isleap(year):
        last_day_list = [31,29,31,30,31,30,31,31,30,31,30,31]
    else:
        last_day_list = [31,28,31,30,31,30,31,31,30,31,30,31]

    off_day_list = []

    for day in range(1,last_day_list[month-1]+1):
        one_date = date(year=year, month=month, day=day)
        if not isWorkDay(one_date):
            off_day_list.append(one_date)

    return off_day_list

# Déterminer si c'est un jour ouvré
us_holidays = holidays.US()
is_business_day_udf = udf(
    lambda dt: dt.weekday() <= 4 and dt.date() not in us_holidays, BooleanType()
)
merged_df = merged_df.withColumn("is_business_day", is_business_day_udf(col("tpep_pickup_datetime")))
merged_df.printSchema()

root
 |-- pickup_date: date (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- hour: integer (nullable = true)
 |-- TMAX: double (nullable = true)
 |-- TMIN: double (nullable = true)
 |-- PRCP: double (nullable = true)
 |-- temp_cat: string (nullable = true)
 |-- weather: string (nullable = true)
 |-- is_business_day: boolean (nullable = true)



In [26]:
merged_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in merged_df.columns]).show()



+-----------+--------------------+------------+-------------+-----------+----------+------------+---------------+----+----+----+----+--------+-------+---------------+
|pickup_date|tpep_pickup_datetime|PULocationID|trip_distance|fare_amount|tip_amount|total_amount|passenger_count|hour|TMAX|TMIN|PRCP|temp_cat|weather|is_business_day|
+-----------+--------------------+------------+-------------+-----------+----------+------------+---------------+----+----+----+----+--------+-------+---------------+
|          0|                   0|           0|            0|          0|         0|           0|         420486|   0|  18|  18|  18|      18|     18|              0|
+-----------+--------------------+------------+-------------+-----------+----------+------------+---------------+----+----+----+----+--------+-------+---------------+



                                                                                

In [28]:
merged_df.write.mode("overwrite").option("header", True).csv("data/final/merged_2024_01.csv")

                                                                                

In [29]:
print(f"Nombre de lignes : {merged_df.count()}")
print(f"Nombre de colonnes : {len(merged_df.columns)}")



Nombre de lignes : 8893836
Nombre de colonnes : 15


                                                                                

In [30]:
merged_df.write.mode("overwrite").parquet("data/final/merged_2024_01.parquet")

                                                                                

In [37]:
merged_df.coalesce(1).write.csv("data/final/merged_2024_01.csv", header=True, mode="overwrite")

                                                                                

In [34]:
merged_df.coalesce(1).write.parquet("data/final/merged_2024_01.parquet", mode="overwrite")

                                                                                