# Bronze Layer

In [0]:
import os
from pyspark.sql import DataFrame
from pyspark.sql.functions import *

In [0]:
%skip
df = spark.read.parquet("/Volumes/etl/raw/nyc_taxi/yellow/2023/yellow_tripdata_2023-01.parquet")

display(df)

In [0]:
%skip
[(field.name, field.dataType) for field in df.schema.fields]

## Yellow Taxi

In [0]:
# List all Parquet files in the directory
base_path = "/Volumes/etl/raw/nyc_taxi/yellow/2023/"
parquet_files = [f.path for f in dbutils.fs.ls(base_path) if f.name.endswith('.parquet')]

# Define the columns and types to standardize
columns = [
    ("VendorID", "string", "vendor_id"),
    ("RatecodeID", "string", "rate_code_id"),
    ("PULocationID", "string", "pickup_location_id"),
    ("DOLocationID", "string", "dropoff_location_id"),
    ("tpep_pickup_datetime", "timestamp", "pickup_datetime"),
    ("tpep_dropoff_datetime", "timestamp", "dropoff_datetime"),
    ("store_and_fwd_flag", "string", "store_and_fwd_flag"),
    ("payment_type", "string", "payment_type"),
    ("passenger_count", "double", "passenger_count"),
    ("trip_distance", "double", "trip_distance"),
    ("fare_amount", "double", "fare_amount"),
    ("extra", "double", "extra"),
    ("mta_tax", "double", "mta_tax"),
    ("tip_amount", "double", "tip_amount"),
    ("tolls_amount", "double", "tolls_amount"),
    ("improvement_surcharge", "double", "improvement_surcharge"),
    ("total_amount", "double", "total_amount"),
    ("congestion_surcharge", "double", "congestion_surcharge"),
    ("airport_fee", "double", "airport_fee")
]

# Read, cast, and union all files
dfs = []
for file in parquet_files:
    df = spark.read.parquet(file)
    select_exprs = [col(c[0]).cast(c[1]).alias(c[2]) for c in columns]
    dfs.append(df.select(*select_exprs))

df_standardized = dfs[0]
for df in dfs[1:]:
    df_standardized = df_standardized.unionByName(df)

df_standardized.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("etl.bronze.bronze__nyc_taxi_yellow")


## Green Taxi

In [0]:
# List all Parquet files in the directory
base_path = "/Volumes/etl/raw/nyc_taxi/green/2023/"
parquet_files = [f.path for f in dbutils.fs.ls(base_path) if f.name.endswith('.parquet')]

# Define the columns and types to standardize
columns = [
    ("VendorID", "string", "vendor_id"),
    ("RatecodeID", "string", "rate_code_id"),
    ("PULocationID", "string", "pickup_location_id"),
    ("DOLocationID", "string", "dropoff_location_id"),
    ("lpep_pickup_datetime", "timestamp", "pickup_datetime"),
    ("lpep_dropoff_datetime", "timestamp", "dropoff_datetime"),
    ("store_and_fwd_flag", "string", "store_and_fwd_flag"),
    ("payment_type", "string", "payment_type"),
    ("trip_type", "string", "trip_type"),
    ("passenger_count", "double", "passenger_count"),
    ("trip_distance", "double", "trip_distance"),
    ("fare_amount", "double", "fare_amount"),
    ("extra", "double", "extra"),
    ("mta_tax", "double", "mta_tax"),
    ("tip_amount", "double", "tip_amount"),
    ("tolls_amount", "double", "tolls_amount"),
    ("improvement_surcharge", "double", "improvement_surcharge"),
    ("total_amount", "double", "total_amount"),
    ("congestion_surcharge", "double", "congestion_surcharge"),
    ("ehail_fee", "double", "ehail_fee")
]

# Read, cast, and union all files
dfs = []
for file in parquet_files:
    df = spark.read.parquet(file)
    select_exprs = [col(c[0]).cast(c[1]).alias(c[2]) for c in columns]
    dfs.append(df.select(*select_exprs))

df_standardized = dfs[0]
for df in dfs[1:]:
    df_standardized = df_standardized.unionByName(df)

df_standardized.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("etl.bronze.bronze__nyc_taxi_green")


## FHV: For-Hire Vehicle

In [0]:
# List all Parquet files in the directory
base_path = "/Volumes/etl/raw/nyc_taxi/fhv/2023/"
parquet_files = [f.path for f in dbutils.fs.ls(base_path) if f.name.endswith('.parquet')]

# Define the columns and types to standardize
columns = [
    ("dispatching_base_num", "string", "dispatching_base_num"),
    ("pickup_datetime", "timestamp", "pickup_datetime"),
    ("dropOff_datetime", "timestamp", "dropoff_datetime"),
    ("PUlocationID", "string", "pickup_location_id"),
    ("DOlocationID", "string", "dropoff_location_id"),
    ("SR_Flag", "string", "sr_flag"),
    ("Affiliated_base_number", "string", "affiliated_base_number")
]

# Read, cast, and union all files
dfs = []
for file in parquet_files:
    df = spark.read.parquet(file)
    select_exprs = [col(c[0]).cast(c[1]).alias(c[2]) for c in columns]
    dfs.append(df.select(*select_exprs))

df_standardized = dfs[0]
for df in dfs[1:]:
    df_standardized = df_standardized.unionByName(df)

df_standardized.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("etl.bronze.bronze__nyc_taxi_fhv")


## FHVHV - For-Hire Vehicle High Volume

In [0]:
# List all Parquet files in the directory
base_path = "/Volumes/etl/raw/nyc_taxi/fhvhv/2023/"
parquet_files = [f.path for f in dbutils.fs.ls(base_path) if f.name.endswith('.parquet')]

# Define the columns and types to standardize
columns = [
    ("hvfhs_license_num", "string", "hvfhs_license_num"),
    ("dispatching_base_num", "string", "dispatching_base_num"),
    ("originating_base_num", "string", "originating_base_num"),
    ("request_datetime", "timestamp", "request_datetime"),
    ("on_scene_datetime", "timestamp", "on_scene_datetime"),
    ("pickup_datetime", "timestamp", "pickup_datetime"),
    ("dropoff_datetime", "timestamp", "dropoff_datetime"),
    ("PULocationID", "string", "pickup_location_id"),
    ("DOLocationID", "string", "dropoff_location_id"),
    ("trip_miles", "double", "trip_miles"),
    ("trip_time", "long", "trip_time"),
    ("base_passenger_fare", "double", "base_passenger_fare"),
    ("tolls", "double", "tolls"),
    ("bcf", "double", "bcf"),
    ("sales_tax", "double", "sales_tax"),
    ("congestion_surcharge", "double", "congestion_surcharge"),
    ("airport_fee", "double", "airport_fee"),
    ("tips", "double", "tips"),
    ("driver_pay", "double", "driver_pay"),
    ("shared_request_flag", "string", "shared_request_flag"),
    ("shared_match_flag", "string", "shared_match_flag"),
    ("access_a_ride_flag", "string", "access_a_ride_flag"),
    ("wav_request_flag", "string", "wav_request_flag"),
    ("wav_match_flag", "string", "wav_match_flag")
]

# Read, cast, and union all files
dfs = []
for file in parquet_files:
    df = spark.read.parquet(file)
    select_exprs = [col(c[0]).cast(c[1]).alias(c[2]) for c in columns]
    dfs.append(df.select(*select_exprs))

df_standardized = dfs[0]
for df in dfs[1:]:
    df_standardized = df_standardized.unionByName(df)

df_standardized.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("etl.bronze.bronze__nyc_taxi_fhvhv")
