In [0]:
# ========================= #
# ---------- LOAD --------- #
# ========================= #
from pyspark.sql.functions import col, year, month
from functools import reduce

# 1) List & Load tables (by prefix)
tables = [t.name for t in spark.catalog.listTables("default")]
## filter with "startswith"
dfs_2024 = [spark.table(f"hive_metastore.default.{table}") for table in tables if table.startswith("yellow_tripdata_2024")]
dfs_2025 = [spark.table(f"hive_metastore.default.{table}") for table in tables if table.startswith("yellow_tripdata_2025")] 

# 2) Union per year
df_2024 = reduce(lambda df1, df2: df1.unionByName(df2), dfs_2024) 
df_2025 = reduce(lambda df1, df2: df1.unionByName(df2), dfs_2025)

# 3) Quick preview + counts
display(df_2024)
print(f"rows in 2024 df: {df_2024.count()}")
print(f"pickup in 2024 df: {df_2024.filter(year('tpep_pickup_datetime') == 2024).count()}")
print(f"dropoff in 2024 df: {df_2024.filter(year('tpep_dropoff_datetime') == 2024).count()}")
print(f"travel in 2024 df: {df_2024.filter((year('tpep_dropoff_datetime') == 2024) & (year('tpep_pickup_datetime') == 2024)).count()}")

null_expr_2024 = reduce(lambda x, y: x | y, [col(c).isNull() for c in df_2024.columns])
print(f"rows in 2024 df with at least one NULL: {df_2024.filter(null_expr_2024).count()}")

display(df_2025)
print(f"rows in 2025 df: {df_2025.count()}")
print(f"pickup in 2025 df: {df_2025.filter(year('tpep_pickup_datetime') == 2025).count()}")
print(f"dropoff in 2025 df: {df_2025.filter(year('tpep_dropoff_datetime') == 2025).count()}")
print(f"travel in 2025 df: {df_2025.filter((year('tpep_dropoff_datetime') == 2025) & (year('tpep_pickup_datetime') == 2025)).count()}")

null_expr_2025 = reduce(lambda x, y: x | y, [col(c).isNull() for c in df_2025.columns])
print(f"rows in 2025 df with at least one NULL: {df_2025.filter(null_expr_2025).count()}")

# 4) Keep only good years/months
##   - 2024: both pickup & dropoff in 2024
##   - 2025: both in 2025 AND only months < 10 (Jan..Sep)
df_2024 = df_2024.filter(
    (year('tpep_dropoff_datetime') == 2024) & 
    (year('tpep_pickup_datetime') == 2024))

df_2025 = df_2025.filter(
    (year('tpep_dropoff_datetime') == 2025) &
    (year('tpep_pickup_datetime') == 2025) &
    (month('tpep_pickup_datetime')  < 10) &
    (month('tpep_dropoff_datetime') < 10)
)

# 5) Union (2024 + 2025)
dfs = [d for d in [df_2024, df_2025] if d is not None]
result_df = reduce(lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), dfs)
print(f"after union: {result_df.count()}")

## drop duplicates
df = result_df.dropDuplicates()

display(df)
print(f"df total rows: {df.count()}")


In [0]:
result_df.printSchema()

In [0]:
# =============================== #
# ----------- FILTER ------------ #
# =============================== #
from pyspark.sql.functions import col, trim, upper, to_timestamp, unix_timestamp, lit
from pyspark.sql import functions as F

# 0) Base DF sanity (timestamps + duplicates)
#    make sure timestamps are real timestamps, then drop exact duplicates
df = (df
      .withColumn("tpep_pickup_datetime",  to_timestamp("tpep_pickup_datetime"))
      .withColumn("tpep_dropoff_datetime", to_timestamp("tpep_dropoff_datetime"))
     ).dropDuplicates()

print(f"[input] rows after typing+dedup: {df.count()}")

# 1) Load ref table (you created it) + minimal hygiene (trim/null/empty)
ref_raw = spark.table("hive_metastore.default.taxi_universal_ref")
ref = (
    ref_raw
    .select(
        trim(col("ref_type")).alias("ref_type"),
        trim(col("code")).alias("code"),
        trim(col("label")).alias("label")
    )
    .filter(col("ref_type").isNotNull() & col("code").isNotNull())
    .filter((col("ref_type") != "") & (col("code") != ""))
    .dropDuplicates(["ref_type","code"])
)

# 2) Slices for SEMI filters (keep only code columns for performance)
pay_ref_semi   = ref.filter(col("ref_type")=="payment_type").select(col("code").alias("code_pay"))
rate_ref_semi  = ref.filter(col("ref_type")=="RatecodeID").select(col("code").alias("code_rate"))
vend_ref_semi  = ref.filter(col("ref_type")=="VendorID").select(col("code").alias("code_vendor"))
store_ref_semi = ref.filter(col("ref_type")=="store_and_fwd_flag").select(upper(col("code")).alias("code_store"))

print("[refs] sizes (semi) →",
      "payment:", pay_ref_semi.count(),
      "| rate:", rate_ref_semi.count(),
      "| vendor:", vend_ref_semi.count(),
      "| store:", store_ref_semi.count())

# 3) LEFT SEMI filtering (keeps rows that match the code lists; does not duplicate rows)
clean_df = df

# VendorID
clean_df = clean_df.join(F.broadcast(vend_ref_semi),
                         clean_df.VendorID.cast("string") == col("code_vendor"),
                         "left_semi")

# RatecodeID
clean_df = clean_df.join(F.broadcast(rate_ref_semi),
                         clean_df.RatecodeID.cast("string") == col("code_rate"),
                         "left_semi")

# store_and_fwd_flag (allow NULLs: keep rows where flag matches OR flag is NULL)
clean_df = clean_df.join(F.broadcast(store_ref_semi),
                         (upper(clean_df.store_and_fwd_flag) == col("code_store")) | clean_df.store_and_fwd_flag.isNull(),
                         "left_semi")

# payment_type
clean_df = clean_df.join(F.broadcast(pay_ref_semi),
                         clean_df.payment_type.cast("string") == col("code_pay"),
                         "left_semi")

print("Before refs filter:", df.count(), "→ After refs filter:", clean_df.count())

# 4) Simple sanity filters
#    build duration, then keep realistic trips
clean_df = clean_df.withColumn(
    "trip_duration_min",
    F.round(
        (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60.0,
        2
    )
)

clean_df = (
    clean_df
    .filter(col("tpep_pickup_datetime").isNotNull() & col("tpep_dropoff_datetime").isNotNull())
    .filter(col("PULocationID").isNotNull() & col("DOLocationID").isNotNull())
    .filter((col("trip_distance") > 0) & (col("trip_distance") < 100))          # (0, 100)
    .filter((col("trip_duration_min") >= 1) & (col("trip_duration_min") <= 180)) # [1, 180] minutes
    .filter((col("passenger_count") >= 1) & (col("passenger_count") <= 6))       # 1..6
    .filter(col("fare_amount")  >= 0)
    .filter(col("tip_amount")   >= 0)
    .filter(col("total_amount") > 0)
)

print("After sanity filters:", clean_df.count())

# =============================== #
# -------- ADD THE LABELS ------- #
# =============================== #

# Build label slices (code + label) for LEFT joins
pay_ref_lbl   = ref.filter(col("ref_type")=="payment_type") \
                   .select(col("code").alias("code_pay_lbl"), col("label").alias("payment_type_label"))

rate_ref_lbl  = ref.filter(col("ref_type")=="RatecodeID") \
                   .select(col("code").alias("code_rate_lbl"), col("label").alias("RatecodeID_label"))

vend_ref_lbl  = ref.filter(col("ref_type")=="VendorID") \
                   .select(col("code").alias("code_vendor_lbl"), col("label").alias("Vendor_label"))

store_ref_lbl = ref.filter(col("ref_type")=="store_and_fwd_flag") \
                   .select(upper(col("code")).alias("code_store_lbl"), col("label").alias("store_and_fwd_flag_label"))

# Join payment_type label
if "payment_type" in clean_df.columns:
    clean_df = (clean_df
        .withColumn("payment_type_str", col("payment_type").cast("string"))
        .join(F.broadcast(pay_ref_lbl),
              col("payment_type_str") == col("code_pay_lbl"),
              "left")
        .drop("code_pay_lbl")
    )

# Join RatecodeID label
if "RatecodeID" in clean_df.columns:
    clean_df = (clean_df
        .withColumn("RatecodeID_str", col("RatecodeID").cast("string"))
        .join(F.broadcast(rate_ref_lbl),
              col("RatecodeID_str") == col("code_rate_lbl"),
              "left")
        .drop("code_rate_lbl")
    )

# Join VendorID label
if "VendorID" in clean_df.columns:
    clean_df = (clean_df
        .withColumn("VendorID_str", col("VendorID").cast("string"))
        .join(F.broadcast(vend_ref_lbl),
              col("VendorID_str") == col("code_vendor_lbl"),
              "left")
        .drop("code_vendor_lbl")
    )

# Join store_and_fwd_flag label
if "store_and_fwd_flag" in clean_df.columns:
    clean_df = (clean_df
        .join(F.broadcast(store_ref_lbl),
              upper(col("store_and_fwd_flag")) == col("code_store_lbl"),
              "left")
        .drop("code_store_lbl")
    )

print(f"After label joins: {clean_df.count()} rows")

# quick peek
display(clean_df.select(
    "tpep_pickup_datetime","tpep_dropoff_datetime","trip_distance","trip_duration_min",
    "passenger_count","fare_amount","tip_amount","total_amount",
    "VendorID","Vendor_label",
    "RatecodeID","RatecodeID_label",
    "store_and_fwd_flag","store_and_fwd_flag_label",
    "payment_type","payment_type_label"
).limit(30))



In [0]:
clean_df.columns

In [0]:
# =============================================== #
# -- Top 10 pickup zones per month (2024–2025) -- #
# =============================================== #
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 0) add year/month if missing (based on pickup time)
clean_df = (clean_df
                .withColumn("year",  F.year("tpep_pickup_datetime"))
                .withColumn("month", F.month("tpep_pickup_datetime")))

# 1) aggregate trips by (year, month, PULocationID)
by_month_zone = (
    clean_df
    .groupBy("year", "month", "PULocationID")
    .agg(F.count("*").alias("nb_trips"))
)

# 2) join zone lookup to get names/boroughs
zones = (spark.table("hive_metastore.default.taxi_zone_lookup")
         .select(F.col("LocationID").cast("int").alias("LocationID"),
                 F.col("Borough").alias("PU_Borough"),
                 F.col("Zone").alias("PU_Zone")))

by_month_zone_named = (
    by_month_zone
    .join(zones, by_month_zone.PULocationID == zones.LocationID, "left")
    .drop("LocationID")
)

# 3) rank within each (year, month) and keep top 10
w = Window.partitionBy("year", "month").orderBy(F.col("nb_trips").desc())

top10_monthly = (
    by_month_zone_named
    .withColumn("rank", F.row_number().over(w))
    .filter(F.col("rank") <= 10)
    .orderBy("year", "month", F.col("nb_trips").desc())
)

display(top10_monthly.select("year","month","PULocationID","PU_Borough","PU_Zone","nb_trips","rank"))


In [0]:
# =============================================== #
# -- Average trip duration (minutes) per month -- #
# =============================================== #
from pyspark.sql import functions as F

travel_duration_by_month = (
    clean_df
    .groupBy(F.year("tpep_pickup_datetime").alias("year"),
             F.month("tpep_pickup_datetime").alias("month"))
    .agg(F.round(F.avg("trip_duration_min"), 2).alias("avg_trip_duration_min"))
    .orderBy("year", "month")
)

display(travel_duration_by_month)

In [0]:
# ============================================= #
# --- Average trip distance by payment type --- #
# ============================================= #
from pyspark.sql import functions as F

# 1) compute average distance per payment_type
travel_distance_by_payment = (
    clean_df
    .groupBy("payment_type")
    .agg(F.round(F.avg("trip_distance"), 2).alias("avg_trip_distance"))
    .orderBy(F.col("avg_trip_distance").desc())
)

# 2) get payment labels from ref table
ref_df = (
    spark.table("hive_metastore.default.taxi_universal_ref")
    .filter(F.col("ref_type") == "payment_type")
    .select(F.col("code").alias("payment_type_code"), F.col("label").alias("payment_type_label"))
)

# 3) join labels
travel_distance_by_payment_labels = (
    travel_distance_by_payment
    .withColumn("payment_type_str", F.col("payment_type").cast("string"))
    .join(ref_df, F.col("payment_type_str") == F.col("payment_type_code"), "left")
    .drop("payment_type_code")
)

# 4) result
display(travel_distance_by_payment_labels.select(
    "payment_type", "payment_type_label", "avg_trip_distance"
))



In [0]:
# ============================================== #
# --- Average fare amount by passenger count --- #
# ============================================== #
from pyspark.sql import functions as F

average_fare_by_passenger_count = (
    clean_df
    .groupBy("passenger_count")
    .agg(
        F.count("*").alias("nb_trips"),
        F.round(F.sum("fare_amount"), 2).alias("total_fare_amount"),
        F.round(F.avg("fare_amount"), 2).alias("avg_fare_amount"),
        # total number of passengers
        F.sum("passenger_count").alias("total_passenger_count"),
        # average fare per passenger
        F.round(F.sum("fare_amount") / F.sum("passenger_count"), 2).alias("avg_fare_per_passenger")
    )
    .orderBy("passenger_count")
)

display(average_fare_by_passenger_count)


In [0]:
# ============================================== #
# ---------- Total tip amount per month -------- #
# ============================================== #
from pyspark.sql import functions as F

total_tips_by_month = (
    clean_df
    .groupBy(
        F.year("tpep_pickup_datetime").alias("year"),
        F.month("tpep_pickup_datetime").alias("month")
    )
    .agg(
        F.count("*").alias("nb_trips"),
        F.round(F.sum("tip_amount"), 2).alias("total_tips"),
        F.round(F.avg("tip_amount"), 2).alias("avg_tip_amount")
    )
    .orderBy("year", "month")
)

display(total_tips_by_month)


In [0]:
print(dbutils.secrets.listScopes())
print(dbutils.secrets.list("local-scope"))

In [0]:
# ================================================ #
# ------------  Azure SQL Connection ------------- #
# ================================================ #

server   = "cacfsql.database.windows.net"
database = "brief"
user     = dbutils.secrets.get("local-scope", "jdbc_username")
password = dbutils.secrets.get("local-scope", "jdbc_password")

jdbc_url = (
    f"jdbc:sqlserver://{server}:1433;"
    f"database={database};"
    "encrypt=true;trustServerCertificate=false;"
)

connection_props = {
    "user": user,
    "password": password,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}


def write_to_sql(df, table_name, mode="overwrite", batchsize=10000):
    (
        df.write
          .format("jdbc")
          .option("url", jdbc_url)
          .option("dbtable", table_name)
          .option("batchsize", batchsize)
          .option("truncate", "true" if mode=="overwrite" else "false")
          .options(**connection_props)
          .mode(mode)
          .save()
    )
    print(f"Table {table_name} written to Azure SQL ({mode})")


write_to_sql(top10_monthly,                    "dbo.dcatry_01_top10_pickup_zones_monthly")
write_to_sql(travel_duration_by_month,         "dbo.dcatry_02_avg_duration_by_month")
write_to_sql(travel_distance_by_payment_labels,"dbo.dcatry_03_avg_distance_by_payment")
write_to_sql(average_fare_by_passenger_count,  "dbo.dcatry_04_avg_fare_by_passenger_count")
write_to_sql(total_tips_by_month,              "dbo.dcatry_05_total_tips_by_month")




In [0]:
# ================================================== #
# ---------------- Azure SQL Tables ---------------- #
# ================================================== #
from pyspark.sql import DataFrame

# On réutilise ton url JDBC et les infos de connexion
tables_df = (
    spark.read
         .format("jdbc")
         .option("url", jdbc_url)
         .option("query", "SELECT TABLE_SCHEMA, TABLE_NAME FROM INFORMATION_SCHEMA.TABLES")
         .options(**connection_props)
         .load()
)

display(tables_df)