# 1. Create the Volume for our files

In [0]:
%sql
CREATE VOLUME taxi_lakehouse.bronze.raw_files;

## Find our files

In [0]:
display(dbutils.fs.ls("/Volumes/taxi_lakehouse/bronze/raw_files"))


path,name,size,modificationTime
dbfs:/Volumes/taxi_lakehouse/bronze/raw_files/yellow_tripdata_2023-01.parquet,yellow_tripdata_2023-01.parquet,47673370,1771347792000
dbfs:/Volumes/taxi_lakehouse/bronze/raw_files/yellow_tripdata_2023-02.parquet,yellow_tripdata_2023-02.parquet,47748012,1771347792000
dbfs:/Volumes/taxi_lakehouse/bronze/raw_files/yellow_tripdata_2023-03.parquet,yellow_tripdata_2023-03.parquet,56127762,1771347793000


# 2. Fix Schema issues

In [0]:
from pyspark.sql import functions as f

raw_dir = "/Volumes/taxi_lakehouse/bronze/raw_files/"

## List of files
#### We only need those whose ends with .parquet

In [0]:
files = [
    raw_dir + f.name
    for f in dbutils.fs.ls(raw_dir) 
    if f.name.endswith(".parquet")
]

files

['/Volumes/taxi_lakehouse/bronze/raw_files/yellow_tripdata_2023-01.parquet',
 '/Volumes/taxi_lakehouse/bronze/raw_files/yellow_tripdata_2023-02.parquet',
 '/Volumes/taxi_lakehouse/bronze/raw_files/yellow_tripdata_2023-03.parquet']

## Finding conflicts

In [0]:
target_casts = {
    "VendorID": "bigint",
    "PULocationID": "bigint",
    "DOLocationID": "bigint",
    "passenger_count": "bigint",
    "RatecodeID": "bigint",
}

def read_cast(path):
    d = spark.read.parquet(path)
    for c, t in target_casts.items():
        if c in d.columns:
            d = d.withColumn(c, f.col(c).cast(t))
    d = d.withColumn("_source_file", f.lit(path))
    return d

dfs = [read_cast(p) for p in files]
len(dfs)



3

## Join columns

In [0]:
df = dfs[0]
for d in dfs[1:]:
    df = df.unionByName(d, allowMissingColumns=True)

## Add Time Columns

In [0]:
from pyspark.sql import functions as f

df = df.withColumn("_ingestion_ts", f.current_timestamp())



In [0]:
df.select("_source_file").distinct().show()

+--------------------+
|        _source_file|
+--------------------+
|/Volumes/taxi_lak...|
|/Volumes/taxi_lak...|
|/Volumes/taxi_lak...|
+--------------------+



# 2. Bronze Ingestion

In [0]:
df.write\
    .format("delta")\
    .mode("overwrite")\
    .saveAsTable("taxi_lakehouse.bronze.yellow_taxi_q1_2023")


In [0]:
%sql
SELECT *
FROM taxi_lakehouse.bronze.yellow_taxi_q1_2023
LIMIT 10;


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,_source_file,_ingestion_ts
2,2023-01-01T00:32:10.000,2023-01-01T00:40:36.000,1,0.97,1,N,161,141,2,9.3,1.0,0.5,0.0,0.0,1.0,14.3,2.5,0.0,/Volumes/taxi_lakehouse/bronze/raw_files/yellow_tripdata_2023-01.parquet,2026-02-17T19:36:47.590Z
2,2023-01-01T00:55:08.000,2023-01-01T01:01:27.000,1,1.1,1,N,43,237,1,7.9,1.0,0.5,4.0,0.0,1.0,16.9,2.5,0.0,/Volumes/taxi_lakehouse/bronze/raw_files/yellow_tripdata_2023-01.parquet,2026-02-17T19:36:47.590Z
2,2023-01-01T00:25:04.000,2023-01-01T00:37:49.000,1,2.51,1,N,48,238,1,14.9,1.0,0.5,15.0,0.0,1.0,34.9,2.5,0.0,/Volumes/taxi_lakehouse/bronze/raw_files/yellow_tripdata_2023-01.parquet,2026-02-17T19:36:47.590Z
1,2023-01-01T00:03:48.000,2023-01-01T00:13:25.000,0,1.9,1,N,138,7,1,12.1,7.25,0.5,0.0,0.0,1.0,20.85,0.0,1.25,/Volumes/taxi_lakehouse/bronze/raw_files/yellow_tripdata_2023-01.parquet,2026-02-17T19:36:47.590Z
2,2023-01-01T00:10:29.000,2023-01-01T00:21:19.000,1,1.43,1,N,107,79,1,11.4,1.0,0.5,3.28,0.0,1.0,19.68,2.5,0.0,/Volumes/taxi_lakehouse/bronze/raw_files/yellow_tripdata_2023-01.parquet,2026-02-17T19:36:47.590Z
2,2023-01-01T00:50:34.000,2023-01-01T01:02:52.000,1,1.84,1,N,161,137,1,12.8,1.0,0.5,10.0,0.0,1.0,27.8,2.5,0.0,/Volumes/taxi_lakehouse/bronze/raw_files/yellow_tripdata_2023-01.parquet,2026-02-17T19:36:47.590Z
2,2023-01-01T00:09:22.000,2023-01-01T00:19:49.000,1,1.66,1,N,239,143,1,12.1,1.0,0.5,3.42,0.0,1.0,20.52,2.5,0.0,/Volumes/taxi_lakehouse/bronze/raw_files/yellow_tripdata_2023-01.parquet,2026-02-17T19:36:47.590Z
2,2023-01-01T00:27:12.000,2023-01-01T00:49:56.000,1,11.7,1,N,142,200,1,45.7,1.0,0.5,10.74,3.0,1.0,64.44,2.5,0.0,/Volumes/taxi_lakehouse/bronze/raw_files/yellow_tripdata_2023-01.parquet,2026-02-17T19:36:47.590Z
2,2023-01-01T00:21:44.000,2023-01-01T00:36:40.000,1,2.95,1,N,164,236,1,17.7,1.0,0.5,5.68,0.0,1.0,28.38,2.5,0.0,/Volumes/taxi_lakehouse/bronze/raw_files/yellow_tripdata_2023-01.parquet,2026-02-17T19:36:47.590Z
2,2023-01-01T00:39:42.000,2023-01-01T00:50:36.000,1,3.01,1,N,141,107,2,14.9,1.0,0.5,0.0,0.0,1.0,19.9,2.5,0.0,/Volumes/taxi_lakehouse/bronze/raw_files/yellow_tripdata_2023-01.parquet,2026-02-17T19:36:47.590Z
