In [2]:
# Load all 3 months CSV files from bronze folder
df_bronze = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("Files/bronze/*.csv")

# Show basic info
print(f"Total raw rows loaded: {df.count():,}")
display(df.limit(10))


StatementMeta(, 4c2ac321-c3ba-405d-b61c-c99ac859dc37, 4, Finished, Available, Finished)

Total raw rows loaded: 1,136,124


SynapseWidget(Synapse.DataFrame, 29e34fae-7c23-42b8-98b6-cc9a6d26770a)

In [4]:
from pyspark.sql.functions import min, max, countDistinct

df_bronze.agg(
    min("tpep_pickup_datetime").alias("earliest_date"),
    max("tpep_pickup_datetime").alias("latest_date"),
    countDistinct("tpep_pickup_datetime").alias("distinct_dates")
).show(truncate=False)

# Also show distinct months present
df_bronze.selectExpr("month(tpep_pickup_datetime) as month").distinct().orderBy("month").show()

StatementMeta(, 4c2ac321-c3ba-405d-b61c-c99ac859dc37, 6, Finished, Available, Finished)

+-------------------+-------------------+--------------+
|earliest_date      |latest_date        |distinct_dates|
+-------------------+-------------------+--------------+
|2008-12-31 23:05:47|2020-11-01 15:41:04|981426        |
+-------------------+-------------------+--------------+

+-----+
|month|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
|    6|
|    7|
|    8|
|    9|
|   10|
|   11|
|   12|
+-----+



In [6]:
df = spark.read.format("csv").option("header","true").load("Files/bronze/yellow_tripdata_2020-04.csv")
# df now is a Spark DataFrame containing CSV data from "Files/bronze/yellow_tripdata_2020-04.csv".
display(df)

df.selectExpr("month(tpep_pickup_datetime) as month").distinct().orderBy("month").show()

StatementMeta(, 4c2ac321-c3ba-405d-b61c-c99ac859dc37, 8, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 399dfad2-b843-4f9d-9a03-1312b60df58b)

+-----+
|month|
+-----+
|    1|
|    2|
|    3|
|    4|
+-----+



In [3]:
from pyspark.sql.functions import input_file_name, min,col, max,count, year, to_timestamp

# Load all CSVs and add source file name + parsed timestamp
df_debug = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("Files/bronze/*.csv") \
    .withColumn("source_file", input_file_name()) \
    .withColumn("pickup_ts", to_timestamp(col("tpep_pickup_datetime")))

# Group by file and show min/max year + date range + row count
df_debug.groupBy("source_file") \
        .agg(
            min("pickup_ts").alias("earliest_date_in_file"),
            max("pickup_ts").alias("latest_date_in_file"),
            min(year("pickup_ts")).alias("earliest_year"),
            max(year("pickup_ts")).alias("latest_year"),
            count("*").alias("rows_in_this_file")
        ) \
        .orderBy("source_file") \
        .show(truncate=False)

StatementMeta(, a3ff6131-c4bc-4672-be9d-eb83e4525ea6, 5, Finished, Available, Finished)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------+-------------------+-------------+-----------+-----------------+
|source_file                                                                                                                                                                                       |earliest_date_in_file|latest_date_in_file|earliest_year|latest_year|rows_in_this_file|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------+-------------------+-------------+-----------+-----------------+
|abfss://2002e555-5023-4025-9909-bf3aacc355dc@onelake.dfs.fabric.microsoft.com/0b9fcbcf-29d2-45ed-9de9-0c4dca5e955a/Files/bronze/yellow_tripdata_2020-0