In [0]:
from pyspark.sql.functions import col
from functools import reduce

all_files = dbutils.fs.ls("/Volumes/workspace/default/data/Data_as_paquets/Data_as_paquets/")
numeric_cols = ["fare_amount", "trip_distance", "tip_amount", "total_amount",
                "passenger_count", "extra", "mta_tax", "tolls_amount",
                "improvement_surcharge", "congestion_surcharge", "airport_fee"]

df_list = []

for f in all_files:
    print("Reading:", f.name)
    df = spark.read.parquet(f.path)
    
    # cast numeric columns to double
    for c in numeric_cols:
        df = df.withColumn(c, col(c).cast("double"))
    
    df_list.append(df)

# union all files safely
df = reduce(lambda d1,d2: d1.unionByName(d2, allowMissingColumns=True), df_list)

print("✅ All files combined safely")
print("Total rows:", df.count())

Reading: yellow_tripdata_2022-01.parquet
Reading: yellow_tripdata_2022-02.parquet
Reading: yellow_tripdata_2022-03.parquet
Reading: yellow_tripdata_2022-04.parquet
Reading: yellow_tripdata_2022-05.parquet
Reading: yellow_tripdata_2022-06.parquet
Reading: yellow_tripdata_2022-07.parquet
Reading: yellow_tripdata_2022-08.parquet
Reading: yellow_tripdata_2022-09.parquet
Reading: yellow_tripdata_2022-10.parquet
Reading: yellow_tripdata_2022-11.parquet
Reading: yellow_tripdata_2022-12.parquet
Reading: yellow_tripdata_2023-01.parquet
Reading: yellow_tripdata_2023-02.parquet
Reading: yellow_tripdata_2023-03.parquet
Reading: yellow_tripdata_2023-04.parquet
Reading: yellow_tripdata_2023-05.parquet
Reading: yellow_tripdata_2023-06.parquet
Reading: yellow_tripdata_2023-07.parquet
Reading: yellow_tripdata_2023-08.parquet
Reading: yellow_tripdata_2023-09.parquet
Reading: yellow_tripdata_2023-10.parquet
Reading: yellow_tripdata_2023-11.parquet
Reading: yellow_tripdata_2023-12.parquet
Reading: yellow_

In [0]:
df.printSchema()
df.count()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



119136044

In [0]:
df.describe(
    "fare_amount",
    "trip_distance",
    "tip_amount"
).show()


+-------+------------------+-----------------+------------------+
|summary|       fare_amount|    trip_distance|        tip_amount|
+-------+------------------+-----------------+------------------+
|  count|         119136044|        119136044|         119136044|
|   mean|16.385721683690377|5.018125657756097| 4.683986916336596|
| stddev|12882.321816671132|446.0488857345717|12882.046863864147|
|    min|     -1.33391414E8|              0.0|            -411.0|
|    max|         401092.32|        398608.62|    1.3339136353E8|
+-------+------------------+-----------------+------------------+



In [0]:
df.groupBy("payment_type").count().show()


+------------+--------+
|payment_type|   count|
+------------+--------+
|           0| 6768891|
|           2|19708486|
|           1|90394854|
|           3|  726928|
|           5|      12|
|           4| 1536873|
+------------+--------+



In [0]:
from pyspark.sql.functions import col,sum

df.select([
    (col(c).isNull().cast("int")).alias(c)
    for c in df.columns
]).groupBy().sum().show()


+-------------+-------------------------+--------------------------+--------------------+------------------+---------------+-----------------------+-----------------+-----------------+-----------------+----------------+----------+------------+---------------+-----------------+--------------------------+-----------------+-------------------------+----------------+
|sum(VendorID)|sum(tpep_pickup_datetime)|sum(tpep_dropoff_datetime)|sum(passenger_count)|sum(trip_distance)|sum(RatecodeID)|sum(store_and_fwd_flag)|sum(PULocationID)|sum(DOLocationID)|sum(payment_type)|sum(fare_amount)|sum(extra)|sum(mta_tax)|sum(tip_amount)|sum(tolls_amount)|sum(improvement_surcharge)|sum(total_amount)|sum(congestion_surcharge)|sum(airport_fee)|
+-------------+-------------------------+--------------------------+--------------------+------------------+---------------+-----------------------+-----------------+-----------------+-----------------+----------------+----------+------------+---------------+---------

In [0]:
df.orderBy("fare_amount", ascending=False).show(10)


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-------+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|  extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-------+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2022-01-07 11:36:43|  2022-01-07 11:47:12|            1.0|          3.3|       1.0|                 N|         107|         140|           4|  401092.32|    2.5|    0.5| 

In [0]:
df.orderBy("trip_distance", ascending=False).show(10)


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-11-28 19:28:00|  2024-11-28 19:42:00|           NULL|    398608.62|      NULL|              NULL|         151|         140|           0|      22.04|  0.0|    0.5|       0.

In [0]:
df.groupBy("tpep_pickup_datetime").count().show()


+--------------------+-----+
|tpep_pickup_datetime|count|
+--------------------+-----+
| 2022-01-01 00:44:10|    2|
| 2022-01-01 00:35:21|    1|
| 2022-01-01 01:29:31|    2|
| 2022-01-01 01:25:21|    1|
| 2022-01-01 02:40:13|    2|
| 2022-01-01 02:07:25|    1|
| 2022-01-01 02:30:12|    2|
| 2022-01-01 02:08:13|    1|
| 2022-01-01 03:57:26|    1|
| 2022-01-01 03:42:02|    1|
| 2022-01-01 05:03:18|    1|
| 2022-01-01 07:12:15|    1|
| 2022-01-01 08:51:15|    1|
| 2022-01-01 08:24:51|    1|
| 2022-01-01 09:48:14|    1|
| 2022-01-01 09:40:13|    1|
| 2022-01-01 10:42:32|    1|
| 2022-01-01 10:31:04|    3|
| 2022-01-01 10:51:43|    1|
| 2022-01-01 10:05:45|    1|
+--------------------+-----+
only showing top 20 rows


In [0]:
from pyspark.sql.functions import hour

df.groupBy(hour("tpep_pickup_datetime").alias("heure")).count().orderBy("heure").show(24)


+-----+-------+
|heure|  count|
+-----+-------+
|    0|3383701|
|    1|2232426|
|    2|1471588|
|    3| 971233|
|    4| 681677|
|    5| 743191|
|    6|1734671|
|    7|3318702|
|    8|4526494|
|    9|5050184|
|   10|5492817|
|   11|5966753|
|   12|6482830|
|   13|6684765|
|   14|7160575|
|   15|7361851|
|   16|7395226|
|   17|8063726|
|   18|8458571|
|   19|7548725|
|   20|6700755|
|   21|6660742|
|   22|6160901|
|   23|4883940|
+-----+-------+



In [0]:
df.filter("fare_amount <= 0 OR trip_distance <= 0").count()


3385000

**_Nettoyage
_**

In [0]:

df = df.dropDuplicates()

total_rows = df.count()
distinct_rows = df.distinct().count()
print(f"Total rows after removing duplicates: {total_rows}")
print(f"Distinct rows after removing duplicates: {distinct_rows}")


Total rows after removing duplicates: 119136037
Distinct rows after removing duplicates: 119136037


In [0]:
from pyspark.sql.functions import when, col

df = df.withColumn(
    "tpep_pickup_datetime",
    when(col("tpep_pickup_datetime") > col("tpep_dropoff_datetime"), col("tpep_dropoff_datetime"))
    .otherwise(col("tpep_pickup_datetime"))
)

df = df.withColumn(
    "tpep_dropoff_datetime",
    when(col("tpep_pickup_datetime") > col("tpep_dropoff_datetime"), col("tpep_pickup_datetime"))
    .otherwise(col("tpep_dropoff_datetime"))
)

df.filter(col("tpep_pickup_datetime") > col("tpep_dropoff_datetime")).count()


0

In [0]:
df = df.withColumn(
    "passenger_count",
    when(col("passenger_count").isNull(), 1).otherwise(col("passenger_count"))
)
# vérifier s'il reste des nulls
df.filter(col("passenger_count").isNull()).count()

0

In [0]:
df = df.withColumn(
    "store_and_fwd_flag",
    when(col("store_and_fwd_flag").isNull(), "N").otherwise(col("store_and_fwd_flag"))
)

# Vérifier
df.filter(col("store_and_fwd_flag").isNull()).count()

0

In [0]:
df = df.withColumn(
    "congestion_surcharge",
    when(col("congestion_surcharge").isNull(), 0).otherwise(col("congestion_surcharge"))
)
# vérifier s'il reste des nulls
df.filter(col("congestion_surcharge").isNull()).count()

0

In [0]:
df = df.withColumn(
    "airport_fee",
    when(col("airport_fee").isNull(), 0).otherwise(col("airport_fee"))
)
# vérifier s'il reste des nulls
df.filter(col("airport_fee").isNull()).count()

0

In [0]:
for c in df.columns:
    print(c, df.filter(col(c).isNull()).count())


VendorID 0
tpep_pickup_datetime 0
tpep_dropoff_datetime 0
passenger_count 0
trip_distance 0
RatecodeID 6768891
store_and_fwd_flag 0
PULocationID 0
DOLocationID 0
payment_type 0
fare_amount 0
extra 0
mta_tax 0
tip_amount 0
tolls_amount 0
improvement_surcharge 0
total_amount 0
congestion_surcharge 0
airport_fee 0


In [0]:
from pyspark.sql.functions import col, when

df = df.withColumn(
    "RatecodeID",
    when(col("RatecodeID").isNull(),1) 
    .otherwise(col("RatecodeID"))
)


**_Statistics_**

_Statistiques descriptives populationnelles_

In [0]:
df.describe(
    "fare_amount",
    "trip_distance",
    "tip_amount",
    "total_amount"
).show()


+-------+------------------+-----------------+------------------+------------------+
|summary|       fare_amount|    trip_distance|        tip_amount|      total_amount|
+-------+------------------+-----------------+------------------+------------------+
|  count|         119136037|        119136037|         119136037|         119136037|
|   mean|16.385721167475328|5.018125625245977| 4.683986998076027|25.984163294125178|
| stddev|12882.322195128369|446.0488988342729|12882.047242315051| 84.37536140847237|
|    min|     -1.33391414E8|              0.0|            -411.0|           -2567.8|
|    max|         401092.32|        398608.62|    1.3339136353E8|         401095.62|
+-------+------------------+-----------------+------------------+------------------+



_Prix moyen d’une course_

In [0]:
mean_fare_pop = df.agg({"fare_amount": "avg"}).collect()[0][0]
mean_fare_pop


16.385721167475133

_Distance moyenne_

In [0]:
mean_dist_pop = df.agg({"trip_distance": "avg"}).collect()[0][0]
mean_dist_pop


5.018125625246372

_Durée moyenne des courses_

In [0]:
from pyspark.sql.functions import col as col, unix_timestamp

#Ajout colonne trip_duration_min (en minutes) à partir de tpep_pickup_datetime et tpep_dropoff_datetime
#unix_timestamp pour convertir les timestamps en secondes
#(unix_timestamp)

df_pop = df.withColumn(
    "trip_duration_min",
    (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60
)


In [0]:
mean_duration_pop = df_pop.agg({"trip_duration_min": "avg"}).collect()[0][0]
mean_duration_pop


29.458122072668914

_Proportion réelle des courses avec tip > 0_

In [0]:
from pyspark.sql.functions import when

prop_tip_pop = df.select(
    (when(col("tip_amount") > 0, 1).otherwise(0)).alias("tip_positive")
).agg({"tip_positive": "avg"}).collect()[0][0]

prop_tip_pop


0.7441899884583202

_Distribution populationnelle_

Par heure

In [0]:
from pyspark.sql.functions import hour

df.groupBy(hour("tpep_pickup_datetime").alias("hour")).count().orderBy("hour").show(24)


+----+-------+
|hour|  count|
+----+-------+
|   0|3383706|
|   1|2232427|
|   2|1471588|
|   3| 971234|
|   4| 681679|
|   5| 743190|
|   6|1734673|
|   7|3318701|
|   8|4526496|
|   9|5050192|
|  10|5492824|
|  11|5966753|
|  12|6482827|
|  13|6684763|
|  14|7160571|
|  15|7361842|
|  16|7395224|
|  17|8063725|
|  18|8458567|
|  19|7548726|
|  20|6700753|
|  21|6660741|
|  22|6160901|
|  23|4883934|
+----+-------+



Par jour de la semaine

In [0]:
from pyspark.sql.functions import date_format

df.groupBy(date_format("tpep_pickup_datetime", "EEEE").alias("day")).count().show()


+---------+--------+
|      day|   count|
+---------+--------+
|  Tuesday|17221151|
| Saturday|17639880|
| Thursday|18563492|
|   Friday|17873036|
|   Sunday|14903210|
|Wednesday|17939394|
|   Monday|14995874|
+---------+--------+



Par semaine ISO

In [0]:
from pyspark.sql.functions import weekofyear

df.groupBy(weekofyear("tpep_pickup_datetime").alias("week")).count().show()


+----+-------+
|week|  count|
+----+-------+
|  29|2247630|
|  24|2449494|
|  39|2440537|
|  38|2030109|
|  15|2428083|
|  20|2602951|
|  52|1795558|
|  21|2286235|
|  44|2550831|
|  22|2321778|
|  26|2190934|
|  17|2378680|
|  47|2212748|
|   1|1941218|
|  48|2347433|
|  30|2159733|
|  46|2373280|
|  23|2526833|
|  42|2569987|
|   4|2003195|
+----+-------+
only showing top 20 rows


_Statistiques par zone géographique_

In [0]:
from pyspark.sql.functions import avg, stddev, count

fare_by_zone_pop = df.groupBy("PULocationID").agg(
    avg("fare_amount").alias("mean_fare"),
    stddev("fare_amount").alias("std_fare"),
    count("*").alias("n")
)

fare_by_zone_pop.orderBy(col("mean_fare").desc()).show(10)


+------------+------------------+------------------+-------+
|PULocationID|         mean_fare|          std_fare|      n|
+------------+------------------+------------------+-------+
|           1|  84.8890804326265| 50.77521916039118|  21173|
|         204| 75.27833333333334| 60.88433029229476|     60|
|         265| 67.39403705401244| 70.52304119342067| 139634|
|           5| 66.48216733870967|11.507746236830359|    992|
|          84| 63.12961538461537| 51.27548448969637|     52|
|          44|59.569974457215864|22.111011462087948|    783|
|         251| 59.55428571428573| 65.77210624573307|    196|
|         132| 55.72660096390528|31.476207398106652|5897672|
|         109|54.214925373134314| 44.49351024311059|    134|
|          10| 53.76089973924067| 40.75294645585753|  41801|
+------------+------------------+------------------+-------+
only showing top 10 rows


_OUTLIERS population_

In [0]:
q1 = df.approxQuantile("fare_amount", [0.25], 0.01)[0]
q3 = df.approxQuantile("fare_amount", [0.75], 0.01)[0]
IQR = q3 - q1

outliers_pop = df.filter(
    (col("fare_amount") < q1 - 1.5*IQR) |
    (col("fare_amount") > q3 + 1.5*IQR)
)

outliers_pop.count()


12490247

In [0]:
#Comparer la moyenne avec et sans outliers :
mean_all = df.agg({"fare_amount": "avg"}).collect()[0][0]
mean_no_outliers = df.filter(
    (col("fare_amount") >= q1 - 1.5*IQR) &
    (col("fare_amount") <= q3 + 1.5*IQR)
).agg({"fare_amount": "avg"}).collect()[0][0]

mean_all, mean_no_outliers


(16.385721167475133, 13.523317196862621)

_Ratio tip/fare par type de paiement_

In [0]:
#Ajouter ratio :
df_ratio = df.withColumn("tip_fare_ratio", col("tip_amount")/col("fare_amount"))
df_ratio = df_ratio.filter(col("tip_fare_ratio").isNotNull())


In [0]:
#Map payment_type :
payment_map_expr = (col("payment_type") == 1).cast("int")  # 1=card


In [0]:
#Agrégation :
from pyspark.sql.functions import col, try_divide, avg, stddev, count

# ratio sécurisé
df_ratio = df.withColumn(
    "tip_fare_ratio",
    try_divide(col("tip_amount"), col("fare_amount"))
)

# garder uniquement les ratios non nuls
df_ratio = df_ratio.filter(col("tip_fare_ratio").isNotNull())

# agrégation par type de paiement
ratio_stats_pop = df_ratio.groupBy("payment_type").agg(
    avg("tip_fare_ratio").alias("mean_ratio"),
    stddev("tip_fare_ratio").alias("std_ratio"),
    count("*").alias("n")
)

ratio_stats_pop.show()


+------------+--------------------+--------------------+--------+
|payment_type|          mean_ratio|           std_ratio|       n|
+------------+--------------------+--------------------+--------+
|           0| 0.08553333469260681| 0.16254497706004045| 6764378|
|           2|-3.44085147895815...|0.005448210519811257|19687622|
|           1|  0.2740960486595459|  15.897292008958246|90387135|
|           3|0.002370810936322...| 0.16307137401937447|  720027|
|           4|-5.87558496481138E-4|  0.1495010296755361| 1529484|
|           5|                 0.0|                 0.0|       4|
+------------+--------------------+--------------------+--------+

