# Lister les fichiers présents dans le DBFS

In [0]:
display(dbutils.fs.ls("/FileStore/tables/brief_data/2024/"))

path,name,size,modificationTime
dbfs:/FileStore/tables/brief_data/2024/yellow_tripdata_2024_01.parquet,yellow_tripdata_2024_01.parquet,49961641,1760537064000
dbfs:/FileStore/tables/brief_data/2024/yellow_tripdata_2024_02.parquet,yellow_tripdata_2024_02.parquet,50349284,1760537064000
dbfs:/FileStore/tables/brief_data/2024/yellow_tripdata_2024_03.parquet,yellow_tripdata_2024_03.parquet,60078280,1760537070000
dbfs:/FileStore/tables/brief_data/2024/yellow_tripdata_2024_04.parquet,yellow_tripdata_2024_04.parquet,59133625,1760537071000
dbfs:/FileStore/tables/brief_data/2024/yellow_tripdata_2024_05.parquet,yellow_tripdata_2024_05.parquet,62553128,1760537075000
dbfs:/FileStore/tables/brief_data/2024/yellow_tripdata_2024_06.parquet,yellow_tripdata_2024_06.parquet,59859922,1760537076000
dbfs:/FileStore/tables/brief_data/2024/yellow_tripdata_2024_07.parquet,yellow_tripdata_2024_07.parquet,52299432,1760537059000
dbfs:/FileStore/tables/brief_data/2024/yellow_tripdata_2024_08.parquet,yellow_tripdata_2024_08.parquet,51067350,1760537059000
dbfs:/FileStore/tables/brief_data/2024/yellow_tripdata_2024_09.parquet,yellow_tripdata_2024_09.parquet,61170186,1760537054000
dbfs:/FileStore/tables/brief_data/2024/yellow_tripdata_2024_10.parquet,yellow_tripdata_2024_10.parquet,64346071,1760537054000


In [0]:
display(dbutils.fs.ls("/FileStore/tables/brief_data/2025/"))

path,name,size,modificationTime
dbfs:/FileStore/tables/brief_data/2025/yellow_tripdata_2025_01.parquet,yellow_tripdata_2025_01.parquet,59158238,1760537519000
dbfs:/FileStore/tables/brief_data/2025/yellow_tripdata_2025_02.parquet,yellow_tripdata_2025_02.parquet,60343086,1760537518000
dbfs:/FileStore/tables/brief_data/2025/yellow_tripdata_2025_03.parquet,yellow_tripdata_2025_03.parquet,69964745,1760537514000
dbfs:/FileStore/tables/brief_data/2025/yellow_tripdata_2025_04.parquet,yellow_tripdata_2025_04.parquet,67352824,1760537513000
dbfs:/FileStore/tables/brief_data/2025/yellow_tripdata_2025_05.parquet,yellow_tripdata_2025_05.parquet,77837865,1760537507000
dbfs:/FileStore/tables/brief_data/2025/yellow_tripdata_2025_06.parquet,yellow_tripdata_2025_06.parquet,73542954,1760537506000
dbfs:/FileStore/tables/brief_data/2025/yellow_tripdata_2025_07.parquet,yellow_tripdata_2025_07.parquet,66943728,1760537500000
dbfs:/FileStore/tables/brief_data/2025/yellow_tripdata_2025_08.parquet,yellow_tripdata_2025_08.parquet,62293743,1760537500000


In [0]:

display(dbutils.fs.ls("/FileStore/tables/brief_data/2025_results/"))


path,name,size,modificationTime
dbfs:/FileStore/tables/brief_data/2025_results/avg_amount_per_passenger_count.parquet/,avg_amount_per_passenger_count.parquet/,0,1760618566000
dbfs:/FileStore/tables/brief_data/2025_results/avg_distance_per_payment_type.parquet/,avg_distance_per_payment_type.parquet/,0,1760618167000
dbfs:/FileStore/tables/brief_data/2025_results/avg_trips_duration_per_month.parquet/,avg_trips_duration_per_month.parquet/,0,1760610529000
dbfs:/FileStore/tables/brief_data/2025_results/tip_per_month.parquet/,tip_per_month.parquet/,0,1760618743000
dbfs:/FileStore/tables/brief_data/2025_results/top10_depart_zones.parquet/,top10_depart_zones.parquet/,0,1760610184000


In [0]:
display(dbutils.fs.ls("/FileStore/tables/brief_data/2024_results/"))

path,name,size,modificationTime
dbfs:/FileStore/tables/brief_data/2024_results/avg_amount_per_passenger_count.parquet/,avg_amount_per_passenger_count.parquet/,0,1760624725000
dbfs:/FileStore/tables/brief_data/2024_results/avg_distance_per_payment_type.parquet/,avg_distance_per_payment_type.parquet/,0,1760624830000
dbfs:/FileStore/tables/brief_data/2024_results/avg_trips_duration_per_month.parquet/,avg_trips_duration_per_month.parquet/,0,1760624525000
dbfs:/FileStore/tables/brief_data/2024_results/tip_per_month.parquet/,tip_per_month.parquet/,0,1760624772000
dbfs:/FileStore/tables/brief_data/2024_results/top10_depart_zones.parquet/,top10_depart_zones.parquet/,0,1760624292000


# 2025

## Creer une base de données pour les résultats dans hive metastore

In [0]:
# Create db in the hive metaverse for 2025 results

spark.sql("CREATE DATABASE IF NOT EXISTS brief_data_2025_results LOCATION 'dbfs:/user/hive/warehouse/brief_data_2025_results.db'")

## Les requetes

In [0]:
from pyspark.sql.functions import col, month, year, avg, sum as spark_sum, count, desc, to_timestamp, expr, rank
from pyspark.sql.window import Window

# Charger les fichiers de 2025
df_2025 = spark.read.parquet("dbfs:/FileStore/tables/brief_data/2025/*")

# Convertir les colonnes de dates en timestamp
df_2025 = df_2025.withColumn("tpep_pickup_datetime", to_timestamp(col("tpep_pickup_datetime"))) \
                 .withColumn("tpep_dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime")))

# Check if month 12 exists
    # df_2025.select("tpep_pickup_datetime").where(month(col("tpep_pickup_datetime")) == 12).show()
# Ajouter colonne mois
df_2025 = df_2025.withColumn("month", month(col("tpep_pickup_datetime")))

# 1. Top 10 zones de départ les plus fréquentées chaque mois

# Aggregate trip counts per month and pickup zone
zone_counts = (
    df_2025.groupBy("month", "PULocationID")
    .agg(count("*").alias("nb_trajets"))
)

# Define a window that ranks zones within each month by trip count
windowSpec = Window.partitionBy("month").orderBy(desc("nb_trajets"))

# Add rank column and filter top 10
top10_depart_zones = (
    zone_counts
    .withColumn("rank", rank().over(windowSpec))
    .filter(col("rank") <= 10)
    .orderBy("month", "rank")
)

top10_depart_zones.show()

top10_depart_zones.write.mode("overwrite").parquet("dbfs:/FileStore/tables/brief_data/2025_results/top10_depart_zones.parquet")
top10_depart_zones.write.mode("overwrite").saveAsTable("brief_data_2025_results.top10_depart_zones")

+-----+------------+----------+----+
|month|PULocationID|nb_trajets|rank|
+-----+------------+----------+----+
|    1|         161|    169977|   1|
|    1|         237|    163704|   2|
|    1|         236|    155647|   3|
|    1|         132|    146138|   4|
|    1|         230|    125830|   5|
|    1|         186|    119131|   6|
|    1|         162|    117930|   7|
|    1|         142|    110584|   8|
|    1|         239|     96614|   9|
|    1|         163|     95905|  10|
|    2|         161|    161413|   1|
|    2|         237|    157292|   2|
|    2|         236|    148094|   3|
|    2|         132|    125460|   4|
|    2|         230|    114529|   5|
|    2|         162|    113721|   6|
|    2|         186|    113389|   7|
|    2|         142|    104361|   8|
|    2|         234|    102638|   9|
|    2|         170|     97272|  10|
+-----+------------+----------+----+
only showing top 20 rows


In [0]:

# 2. Durée moyenne des trajets par mois (en minutes)
df_2025 = df_2025.withColumn(
    "trip_duration_min",
    (col("tpep_dropoff_datetime").cast("long") - col("tpep_pickup_datetime").cast("long")) / 60
)
avg_trips_duration_per_month = (
    df_2025.groupBy("month")
    .agg(avg("trip_duration_min").alias("duree_moyenne_min"))
    .orderBy("month")
)
display(avg_trips_duration_per_month)
avg_trips_duration_per_month.write.mode("overwrite").parquet("dbfs:/FileStore/tables/brief_data/2025_results/avg_trips_duration_per_month.parquet")
avg_trips_duration_per_month.write.mode("overwrite").saveAsTable("brief_data_2025_results.avg_trips_duration_per_month")



month,duree_moyenne_min
1,15.01813008682324
2,15.407841873926987
3,15.989197697725618
4,16.62802115885467
5,17.911733002402833
6,17.409922971566605
7,17.09944761066511
8,17.27992699181318
9,15.583333333333334
12,16.81439393939394


In [0]:
# 3. Distance moyenne par type de paiement
avg_distance_per_payment_type = (
    df_2025.groupBy("payment_type")
    .agg(avg(col("trip_distance").cast("double")).alias("distance_moyenne"))
)
display(avg_distance_per_payment_type)
avg_distance_per_payment_type.write.mode("overwrite").parquet("dbfs:/FileStore/tables/brief_data/2025_results/avg_distance_per_payment_type.parquet")
avg_distance_per_payment_type.write.mode("overwrite").saveAsTable("brief_data_2025_results.avg_distance_per_payment_type")


payment_type,distance_moyenne
0,17.91087064637173
1,3.558946986194505
3,2.7744847822415544
2,3.3514405170765094
4,4.340394715432738
5,3.333333333333333


In [0]:

# 4. Montant moyen des courses par nombre de passagers
avg_amount_per_passenger_count = (
    df_2025.groupBy("passenger_count")
    .agg(avg(col("total_amount").cast("double")).alias("montant_moyen"))
    .orderBy("passenger_count")
)
display(avg_amount_per_passenger_count)

avg_amount_per_passenger_count.write.mode("overwrite").parquet("dbfs:/FileStore/tables/brief_data/2025_results/avg_amount_per_passenger_count.parquet")
avg_amount_per_passenger_count.write.mode("overwrite").saveAsTable("brief_data_2025_results.avg_amount_per_passenger_count")




passenger_count,montant_moyen
,21.847981649302568
0.0,25.28628104549295
1.0,27.099586266083872
2.0,30.54720962407008
3.0,30.46626322013798
4.0,33.16086588295623
5.0,26.555476194211355
6.0,26.82071756075189
7.0,85.40904761904763
8.0,99.59546875


In [0]:
# 5. Somme totale des pourboires versés chaque mois
tip_per_month = (
    df_2025.groupBy("month")
    .agg(spark_sum(col("tip_amount").cast("double")).alias("somme_pourboires"))
    .orderBy("month")
)
display(tip_per_month)

tip_per_month.write.mode("overwrite").parquet("dbfs:/FileStore/tables/brief_data/2025_results/tip_per_month.parquet")
tip_per_month.write.mode("overwrite").saveAsTable("brief_data_2025_results.tip_per_month")


month,somme_pourboires
1,10286048.369995195
2,9766920.479996877
3,11850613.539990015
4,11830980.689992746
5,13124065.449988317
6,11899584.699991656
7,10473105.26999529
8,9711516.75999626
9,4.27
12,85.07


## Tester la connexion à Azure SQL Database

In [0]:
%pip install python-dotenv


Collecting python-dotenv
  Downloading python_dotenv-1.1.1-py3-none-any.whl.metadata (24 kB)
Downloading python_dotenv-1.1.1-py3-none-any.whl (20 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.1.1
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
%restart_python

In [0]:
import os
from dotenv import load_dotenv

# Load the .env file
load_dotenv("/Workspace/Users/csaad.ext@simplonformations.onmicrosoft.com/.env")  

# Retrieve environment variables
user = os.getenv("SQL_USER")
password = os.getenv("SQL_PASSWORD")
driver = os.getenv("SQL_DRIVER")
jdbc_url = os.getenv("SQL_JDBC_URL")

# Build connection properties
connection_properties = {
    "user": user,
    "password": password,
    "driver": driver
}

# Test connection
try:
    df = spark.read.jdbc(url=jdbc_url, table="(SELECT TOP 5 name FROM sys.tables) as t", properties=connection_properties)
    display(df)
    print("✅ Connection successful")
except Exception as e:
    print("❌ Connection failed:", e)


name


✅ Connection successful


## Enregistrer les résultats dans Azure SQL Database

In [0]:
tables = [
    "top10_depart_zones",
    "avg_trips_duration_per_month",
    "avg_distance_per_payment_type",
    "avg_amount_per_passenger_count",
    "tip_per_month"
]

for table in tables:
    df = spark.table(f"brief_data_2025_results.{table}")
    df.write.jdbc(
        url=jdbc_url,
        table='brief_data_results_2025.{table}',
        mode="overwrite",
        properties=connection_properties
    )

# 2024

## Creer une base de données pour les résultats dans hive metastore

In [0]:
# Create db in the hive metaverse for 2024 results

spark.sql("CREATE DATABASE IF NOT EXISTS brief_data_2024_results LOCATION 'dbfs:/user/hive/warehouse/brief_data_2024_results.db'")

DataFrame[]

## Les requetes

In [0]:
from pyspark.sql.functions import col, month, year, avg, sum as spark_sum, count, desc, to_timestamp, expr, rank
from pyspark.sql.window import Window

# Charger les fichiers de 2024
df_2024 = spark.read.parquet("dbfs:/FileStore/tables/brief_data/2024/*")

# Convertir les colonnes de dates en timestamp
df_2024 = df_2024.withColumn("tpep_pickup_datetime", to_timestamp(col("tpep_pickup_datetime"))) \
                 .withColumn("tpep_dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime")))


# Ajouter colonne mois
df_2024 = df_2024.withColumn("month", month(col("tpep_pickup_datetime")))

# 1. Top 10 zones de départ les plus fréquentées chaque mois

# Aggregate trip counts per month and pickup zone
zone_counts = (
    df_2024.groupBy("month", "PULocationID")
    .agg(count("*").alias("nb_trajets"))
)

# Define a window that ranks zones within each month by trip count
windowSpec = Window.partitionBy("month").orderBy(desc("nb_trajets"))

# Add rank column and filter top 10
top10_depart_zones = (
    zone_counts
    .withColumn("rank", rank().over(windowSpec))
    .filter(col("rank") <= 10)
    .orderBy("month", "rank")
)

top10_depart_zones.show()

top10_depart_zones.write.mode("overwrite").parquet("dbfs:/FileStore/tables/brief_data/2024_results/top10_depart_zones.parquet")
top10_depart_zones.write.mode("overwrite").saveAsTable("brief_data_2024_results.top10_depart_zones")

+-----+------------+----------+----+
|month|PULocationID|nb_trajets|rank|
+-----+------------+----------+----+
|    1|         132|    145243|   1|
|    1|         161|    143470|   2|
|    1|         237|    142709|   3|
|    1|         236|    136464|   4|
|    1|         162|    106718|   5|
|    1|         230|    106324|   6|
|    1|         186|    104522|   7|
|    1|         142|    104081|   8|
|    1|         138|     89535|   9|
|    1|         239|     88474|  10|
|    2|         161|    147089|   1|
|    2|         237|    140800|   2|
|    2|         236|    134000|   3|
|    2|         132|    126802|   4|
|    2|         162|    105937|   5|
|    2|         230|    102280|   6|
|    2|         186|    100416|   7|
|    2|         142|     97253|   8|
|    2|         239|     90849|   9|
|    2|         163|     89182|  10|
+-----+------------+----------+----+
only showing top 20 rows


In [0]:

# 2. Durée moyenne des trajets par mois (en minutes)
df_2024 = df_2024.withColumn(
    "trip_duration_min",
    (col("tpep_dropoff_datetime").cast("long") - col("tpep_pickup_datetime").cast("long")) / 60
)
avg_trips_duration_per_month = (
    df_2024.groupBy("month")
    .agg(avg("trip_duration_min").alias("duree_moyenne_min"))
    .orderBy("month")
)
display(avg_trips_duration_per_month)
avg_trips_duration_per_month.write.mode("overwrite").parquet("dbfs:/FileStore/tables/brief_data/2024_results/avg_trips_duration_per_month.parquet")
avg_trips_duration_per_month.write.mode("overwrite").saveAsTable("brief_data_2024_results.avg_trips_duration_per_month")



month,duree_moyenne_min
1,15.615235721383506
2,15.98241409185477
3,16.672463906466835
4,17.04388893552472
5,18.023203569539906
6,17.573992443244656
7,17.224363835267173
8,17.36360182000267
9,18.62118618323312
10,18.263951978640225


In [0]:
# 3. Distance moyenne par type de paiement
avg_distance_per_payment_type = (
    df_2024.groupBy("payment_type")
    .agg(avg(col("trip_distance").cast("double")).alias("distance_moyenne"))
)
display(avg_distance_per_payment_type)
avg_distance_per_payment_type.write.mode("overwrite").parquet("dbfs:/FileStore/tables/brief_data/2024_results/avg_distance_per_payment_type.parquet")
avg_distance_per_payment_type.write.mode("overwrite").saveAsTable("brief_data_2024_results.avg_distance_per_payment_type")


payment_type,distance_moyenne
0,18.33296200997551
1,3.535833242890832
3,2.621370178547544
2,3.35443485735263
4,3.571944382210582
5,0.0


In [0]:

# 4. Montant moyen des courses par nombre de passagers
avg_amount_per_passenger_count = (
    df_2024.groupBy("passenger_count")
    .agg(avg(col("total_amount").cast("double")).alias("montant_moyen"))
    .orderBy("passenger_count")
)
display(avg_amount_per_passenger_count)

avg_amount_per_passenger_count.write.mode("overwrite").parquet("dbfs:/FileStore/tables/brief_data/2024_results/avg_amount_per_passenger_count.parquet")
avg_amount_per_passenger_count.write.mode("overwrite").saveAsTable("brief_data_2024_results.avg_amount_per_passenger_count")




passenger_count,montant_moyen
,24.61012771458731
0.0,25.469963000243236
1.0,27.42156528427329
2.0,31.19777104348219
3.0,30.665225669674356
4.0,32.86875704543564
5.0,28.208674757417526
6.0,27.04058290623641
7.0,68.85535714285714
8.0,90.88848958333334


In [0]:
# 5. Somme totale des pourboires versés chaque mois
tip_per_month = (
    df_2024.groupBy("month")
    .agg(spark_sum(col("tip_amount").cast("double")).alias("somme_pourboires"))
    .orderBy("month")
)
display(tip_per_month)

tip_per_month.write.mode("overwrite").parquet("dbfs:/FileStore/tables/brief_data/2024_results/tip_per_month.parquet")
tip_per_month.write.mode("overwrite").saveAsTable("brief_data_2024_results.tip_per_month")


month,somme_pourboires
1,9889652.120006042
2,9933402.770005118
3,11431414.430006536
4,11351432.030008027
5,12371062.920007192
6,11510536.2200056
7,10041391.480004491
8,9724912.200004471
9,12021705.790005175
10,13130067.950006422


## Enregistrer les résultats dans Azure SQL Database

In [0]:
tables = [
    "top10_depart_zones",
    "avg_trips_duration_per_month",
    "avg_distance_per_payment_type",
    "avg_amount_per_passenger_count",
    "tip_per_month"
]

for table in tables:
    df = spark.table(f"brief_data_2024_results.{table}")
    df.write.jdbc(
        url=jdbc_url,
        table=f"brief_data_results_2024.{table}",
        mode="overwrite",
        properties=connection_properties
    )