In [0]:
%sql
CREATE VOLUME IF NOT EXISTS workspace.default.nyc_taxi;

In [0]:
import os
import urllib.request
from datetime import datetime
from pyspark.sql.functions import current_timestamp, input_file_name

# Taxi types to process
TAXI_TYPES = ["yellow", "green", "fhv", "fhvhv"]
START_YEAR = 2020
CURRENT_YEAR = datetime.now().year

# NYC TLC data base URL
BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data"

# DBFS mount locations
RAW_BASE = "/Volumes/workspace/default/nyc_taxi/raw"
BRONZE_BASE = "/Volumes/workspace/default/nyc_taxi/bronze"
SILVER_BASE = "/Volumes/workspace/default/nyc_taxi/silver"
GOLD_BASE = "/Volumes/workspace/default/nyc_taxi/gold"

In [0]:
def file_exists_dbfs(path):
    """Check if file exists in DBFS"""
    try:
        return len(dbutils.fs.ls(path)) > 0
    except:
        return False
    
def download_and_save(taxi_type, year, month):
    """
    Download monthly taxi file from NYC TLC and save to DBFS raw folder
    Skip if file already exists
    """
    file_name = f"{taxi_type}_tripdata_{year}-{month:02d}.parquet"
    raw_dir = f"{RAW_BASE}/{taxi_type}/{year}"
    raw_path = f"{raw_dir}/{file_name}"
    
    # Skip if file already in raw
    if file_exists_dbfs(raw_path):
        print(f"Skipping download (already exists): {raw_path}")
        return raw_path
    
    # ensure parent dir exists 
    os.makedirs(raw_dir, exist_ok=True)

    # Download from source and save to DBFS raw folder
    url = f"{BASE_URL}/{file_name}"
    try:
        print(f"Downloading {url}")
        urllib.request.urlretrieve(url, raw_path)
        print(f"Saved raw file to {raw_path}")
    except Exception as e:
        print(f"File not found or error downloading {file_name}: {e}")
        return None
    
    return raw_path

In [0]:
from pyspark.sql.functions import col, lit
def load_to_bronze(raw_path, taxi_type):
    """
    Load raw file into Bronze Delta table with minimal transformations
    Skip if file already ingested (by checking source_file in bronze)
    """
    bronze_path = f"{BRONZE_BASE}/{taxi_type}"
    table_name = f"bronze_{taxi_type}_taxi"
    
    # Check if file already loaded
    try:
        bronze_df = spark.read.format("delta").load(bronze_path)
        if "source_file" in bronze_df.columns:
            if bronze_df.filter(bronze_df.source_file.endswith(f"/{os.path.basename(raw_path)}")).limit(1).count() > 0:
                print(f"Skipping Bronze load (already ingested): {raw_path}")
                return
    except:
        # Bronze table might not exist yet
        pass

    # Load parquet file
    df = (spark.read.format("parquet")
          .option("inferSchema", "true")
          .load(raw_path)
          .withColumn("ingestion_date", current_timestamp())
          .withColumn("source_file", lit(os.path.basename(raw_path)))
          .withColumn("taxi_type", lit(taxi_type))
        )
    # Pentru a evita erorile de inconsistenta, cast to string
    for col_name, dtype in df.dtypes:
        df = df.withColumn(col_name, col(col_name).cast("string"))

    # Append to Delta table
    (df.write
       .format("delta")
       .mode("append")
       .option("mergeSchema", "true")
       .save(bronze_path))

    print(f"Written to Bronze: {bronze_path}")


In [0]:
# for taxi_type in TAXI_TYPES:
#     for year in range(START_YEAR, CURRENT_YEAR + 1):
#         for month in range(1, 13):
#             # Skip future months
#             if year == CURRENT_YEAR and month > datetime.now().month:
#                 continue
            
#             # Download raw file (skip if exists)
#             raw_file_path = download_and_save(taxi_type, year, month)
#             if raw_file_path:
#                 # Load into Bronze (skip if already ingested)
#                 load_to_bronze(raw_file_path, taxi_type)

# print("Ingestion complete.")

Skipping download (already exists): /Volumes/workspace/default/nyc_taxi/raw/yellow/2020/yellow_tripdata_2020-01.parquet
Written to Bronze: /Volumes/workspace/default/nyc_taxi/bronze/yellow
Skipping download (already exists): /Volumes/workspace/default/nyc_taxi/raw/yellow/2020/yellow_tripdata_2020-02.parquet
Written to Bronze: /Volumes/workspace/default/nyc_taxi/bronze/yellow
Skipping download (already exists): /Volumes/workspace/default/nyc_taxi/raw/yellow/2020/yellow_tripdata_2020-03.parquet
Written to Bronze: /Volumes/workspace/default/nyc_taxi/bronze/yellow
Skipping download (already exists): /Volumes/workspace/default/nyc_taxi/raw/yellow/2020/yellow_tripdata_2020-04.parquet
Written to Bronze: /Volumes/workspace/default/nyc_taxi/bronze/yellow
Skipping download (already exists): /Volumes/workspace/default/nyc_taxi/raw/yellow/2020/yellow_tripdata_2020-05.parquet
Written to Bronze: /Volumes/workspace/default/nyc_taxi/bronze/yellow
Skipping download (already exists): /Volumes/workspace/

In [0]:
bronze_yellow_taxi = spark.read.format("delta").load(f"{BRONZE_BASE}/yellow")
bronze_yellow_taxi.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)
 |-- airport_fee: string (nullable = true)
 |-- ingestion_date: string (nullable = true)
 |-- source_file: string (nullable = true)
 |-- taxi_type: string (nullable = true)
 |-- cb

In [0]:
bronze_green_taxi = spark.read.format("delta").load(f"{BRONZE_BASE}/green")
bronze_green_taxi.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- trip_type: string (nullable = true)
 |-- congestion_surcharge: string (nullable = true)
 |-- ingestion_date: string (nullable = true)
 |-- source_file: string (nullable = true)
 |-- taxi

In [0]:
sample_raw_yellow = spark.read.parquet("/Volumes/workspace/default/nyc_taxi/raw/yellow/2025/yellow_tripdata_2025-04.parquet")
sample_raw_yellow.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)



In [0]:
bronze_yellow_taxi.select("airport_fee").distinct().show()

+-----------+
|airport_fee|
+-----------+
|       NULL|
|       1.75|
|       1.25|
|        0.0|
|      -1.75|
|       6.75|
|        5.0|
|      -1.25|
|       0.75|
|        0.5|
|        1.7|
|        1.0|
+-----------+



In [0]:
bronze_fhv = spark.read.format("delta").load(f"{BRONZE_BASE}/fhv")
bronze_fhv.printSchema()
bronze_fhvhv = spark.read.format("delta").load(f"{BRONZE_BASE}/fhvhv")
bronze_fhvhv.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropOff_datetime: string (nullable = true)
 |-- PUlocationID: string (nullable = true)
 |-- DOlocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)
 |-- ingestion_date: string (nullable = true)
 |-- source_file: string (nullable = true)
 |-- taxi_type: string (nullable = true)

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: string (nullable = true)
 |-- on_scene_datetime: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- trip_miles: string (nullable = true)
 |-- trip_time: string (nullable = true)
 |-- base_passe

**Silver**

In [0]:
# Cast the columns to the right type
from pyspark.sql.functions import col, lit, to_timestamp, coalesce, sha2, concat_ws

taxi_cols = [
    "vendor_id", "pickup_datetime", "dropoff_datetime", "passenger_count",
    "trip_distance", "pickup_location_id", "dropoff_location_id",
    "rate_code", "payment_type", "fare_amount", "extra", "mta_tax",
    "tip_amount", "tolls_amount", "improvement_surcharge", "congestion_surcharge",
    "airport_fee", "total_amount", "trip_type", "cbd_congestion_fee",
    "ehail_fee", "taxi_type", "trip_id"
]

fhv_cols = [
    "dispatching_base_num","pickup_datetime","dropoff_datetime","pickup_location_id",
    "dropoff_location_id","sr_flag", "passenger_count", "trip_distance", "fare_amount",
    "tip_amount", "total_amount","trip_type", "trip_id"
]

fhvhv_cols = [
    "hvfhs_license_num","pickup_datetime","dropoff_datetime","pickup_location_id","dropoff_location_id",
    "dispatching_base_num","trip_distance","trip_duration_seconds","fare_amount","tip_amount",
    "tolls_amount","airport_fee","congestion_surcharge","sales_tax","bcf","driver_pay",
    "cbd_congestion_fee","total_amount","trip_type", "trip_id"
]

def transform_yellow(bronze_df):
    return (
        bronze_df
        .withColumn("vendor_id", col("VendorID").cast("long"))
        .withColumn("pickup_datetime", to_timestamp(col("tpep_pickup_datetime")))
        .withColumn("dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime")))
        .withColumn("passenger_count", col("passenger_count").cast("double"))
        .withColumn("trip_distance", col("trip_distance").cast("double"))
        .withColumn("pickup_location_id", col("PULocationID").cast("int"))
        .withColumn("dropoff_location_id", col("DOLocationID").cast("int"))
        .withColumn("rate_code", col("RatecodeID").cast("double").cast("int"))
        .withColumn("payment_type", col("payment_type").cast("int"))
        .withColumn("fare_amount", col("fare_amount").cast("double"))
        .withColumn("extra", col("extra").cast("double"))
        .withColumn("mta_tax", col("mta_tax").cast("double"))
        .withColumn("tip_amount", col("tip_amount").cast("double"))
        .withColumn("tolls_amount", col("tolls_amount").cast("double"))
        .withColumn("improvement_surcharge", col("improvement_surcharge").cast("double"))
        .withColumn("congestion_surcharge", col("congestion_surcharge").cast("double"))
        .withColumn("airport_fee", col("airport_fee").cast("double"))
        .withColumn("total_amount", col("total_amount").cast("double"))
        .withColumn("trip_type", lit("taxi"))  # added by me
        .withColumn("cbd_congestion_fee", lit(None).cast("double")) # available in Yellow from 2025
        .withColumn("ehail_fee", lit(None).cast("double")) # not available in Yellow
        .withColumn("taxi_type", lit("yellow"))
        .withColumn("trip_id", sha2(concat_ws("||",
            col("pickup_datetime").cast("string"),
            col("dropoff_datetime").cast("string"),
            col("pickup_location_id"),
            col("dropoff_location_id"),
            col("vendor_id"),
            col("taxi_type")
        ), 256))
        .select(taxi_cols)
    )
    
def transform_green(bronze_df):
    return (
        bronze_df
        .withColumn("vendor_id", col("VendorID").cast("long"))
        .withColumn("pickup_datetime", to_timestamp(col("lpep_pickup_datetime")))
        .withColumn("dropoff_datetime", to_timestamp(col("lpep_dropoff_datetime")))
        .withColumn("passenger_count", col("passenger_count").cast("double"))
        .withColumn("trip_distance", col("trip_distance").cast("double"))
        .withColumn("pickup_location_id", col("PULocationID").cast("int"))
        .withColumn("dropoff_location_id", col("DOLocationID").cast("int"))
        .withColumn("rate_code", col("RatecodeID").cast("double").cast("int"))
        .withColumn("payment_type", col("payment_type").cast("double").cast("int"))
        .withColumn("fare_amount", col("fare_amount").cast("double"))
        .withColumn("extra", col("extra").cast("double"))
        .withColumn("mta_tax", col("mta_tax").cast("double"))
        .withColumn("tip_amount", col("tip_amount").cast("double"))
        .withColumn("tolls_amount", col("tolls_amount").cast("double"))
        .withColumn("ehail_fee", col("ehail_fee").cast("double"))
        .withColumn("improvement_surcharge", col("improvement_surcharge").cast("double"))
        .withColumn("congestion_surcharge", col("congestion_surcharge").cast("double"))
        .withColumn("airport_fee", lit(None).cast("double"))  # not available in Green
        .withColumn("total_amount", col("total_amount").cast("double"))
        .withColumn("trip_type", lit("taxi")) # added by me
        .withColumn("cbd_congestion_fee", col("cbd_congestion_fee").cast("double")) # available in Green from 2025
        .withColumn("taxi_type", lit("green"))
        .withColumn("trip_id", sha2(concat_ws("||",
            col("pickup_datetime").cast("string"),
            col("dropoff_datetime").cast("string"),
            col("pickup_location_id"),
            col("dropoff_location_id"),
            col("vendor_id"),
            col("taxi_type")
        ), 256))
        .select(taxi_cols)
    )

def transform_fhv(bronze_df):
    return (
        bronze_df
        .withColumn("pickup_datetime", to_timestamp(col("pickup_datetime")))
        .withColumn("dropoff_datetime", to_timestamp(col("dropOff_datetime")))
        .withColumn("pickup_location_id", col("PUlocationID").cast("double").cast("int"))
        .withColumn("dropoff_location_id", col("DOlocationID").cast("double").cast("int"))
        .withColumn("dispatching_base_num", col("dispatching_base_num"))
        .withColumn("sr_flag", col("SR_Flag"))
        .withColumn("fare_amount", lit(None).cast("double"))
        .withColumn("trip_distance", lit(None).cast("double"))
        .withColumn("passenger_count", lit(None).cast("double"))
        .withColumn("tip_amount", lit(None).cast("double"))
        .withColumn("total_amount", lit(None).cast("double"))
        .withColumn("trip_type", lit("fhv"))
        .withColumn("trip_id", sha2(concat_ws("||",
            col("pickup_datetime").cast("string"),
            col("dropoff_datetime").cast("string"),
            col("pickup_location_id"),
            col("dropoff_location_id"),
            col("dispatching_base_num"),
            col("trip_type")
        ), 256))
        .select(fhv_cols)
    )   

def transform_fhvhv(bronze_df):
    return (
        bronze_df
        .withColumn("pickup_datetime", to_timestamp(col("pickup_datetime")))
        .withColumn("dropoff_datetime", to_timestamp(col("dropoff_datetime")))
        .withColumn("pickup_location_id", col("PUlocationID").cast("double").cast("int"))
        .withColumn("dropoff_location_id", col("DOlocationID").cast("double").cast("int"))
        .withColumn("hvfhs_license_num", col("hvfhs_license_num"))  # Uber/Lyft/Via/Juno string code
        .withColumn("dispatching_base_num", col("dispatching_base_num"))  
        .withColumn("trip_distance", col("trip_miles").cast("double"))
        .withColumn("trip_duration_seconds", col("trip_time").cast("double"))
        .withColumn("fare_amount", col("base_passenger_fare").cast("double"))
        .withColumn("tip_amount", col("tips").cast("double"))
        .withColumn("tolls_amount", col("tolls").cast("double"))
        .withColumn("airport_fee", col("airport_fee").cast("double"))
        .withColumn("congestion_surcharge", col("congestion_surcharge").cast("double"))
        .withColumn("sales_tax", col("sales_tax").cast("double"))
        .withColumn("bcf", col("bcf").cast("double"))  # Black Car Fund surcharge
        .withColumn("driver_pay", col("driver_pay").cast("double"))
        .withColumn("cbd_congestion_fee", col("cbd_congestion_fee").cast("double")) 
        .withColumn("total_amount", 
                    (coalesce(col("fare_amount"), lit(0)) + 
                    coalesce(col("tolls_amount"), lit(0)) + 
                    coalesce(col("sales_tax"), lit(0)) + 
                    coalesce(col("congestion_surcharge"), lit(0))+ 
                    coalesce(col("driver_pay"), lit(0)) + 
                    coalesce(col("bcf"), lit(0)) + 
                    coalesce(col("airport_fee"), lit(0)) + 
                    coalesce(col("tip_amount"), lit(0))).cast("double"))
        .withColumn("trip_type", lit("fhvhv"))
        .withColumn("trip_id", sha2(concat_ws("||",
            col("pickup_datetime").cast("string"),
            col("dropoff_datetime").cast("string"),
            col("pickup_location_id"),
            col("dropoff_location_id"),
            col("hvfhs_license_num"),
            col("trip_type")
        ), 256))
        .select(fhvhv_cols)
    )


In [0]:
silver_yellow_taxi = transform_yellow(bronze_yellow_taxi)
silver_green_taxi = transform_green(bronze_green_taxi)
silver_fhv = transform_fhv(bronze_fhv)
silver_fhvhv = transform_fhvhv(bronze_fhvhv)

In [0]:
# bronze_yellow_taxi.show(5)
silver_yellow_taxi.show(5)
# bronze_green_taxi.show(5)
silver_green_taxi.show(5)
# bronze_fhv.show(5)
silver_fhv.show(5)
# bronze_fhvhv.show(3)
silver_fhvhv.show(3)

+---------+-------------------+-------------------+---------------+-------------+------------------+-------------------+---------+------------+-----------+-----+-------+----------+------------+---------------------+--------------------+-----------+------------+---------+------------------+---------+---------+--------------------+
|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|pickup_location_id|dropoff_location_id|rate_code|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|congestion_surcharge|airport_fee|total_amount|trip_type|cbd_congestion_fee|ehail_fee|taxi_type|             trip_id|
+---------+-------------------+-------------------+---------------+-------------+------------------+-------------------+---------+------------+-----------+-----+-------+----------+------------+---------------------+--------------------+-----------+------------+---------+------------------+---------+---------+--------------------+
|   

In [0]:
from pyspark.sql.functions import col, lit, to_timestamp, unix_timestamp

silver_taxi = silver_yellow_taxi.unionByName(silver_green_taxi)

# add trip_duration_seconds to taxi
silver_taxi = silver_taxi \
    .withColumn("trip_duration_seconds",
                    unix_timestamp(col("dropoff_datetime")) - unix_timestamp(col("pickup_datetime"))
                    )

# add trip_duration_seconds to fhv    
silver_fhv = silver_fhv \
    .withColumn("trip_duration_seconds",
                    unix_timestamp(col("dropoff_datetime")) - unix_timestamp(col("pickup_datetime"))
                    )

In [0]:
from pyspark.sql.functions import col, lit

def enrich_silver(df, source):
    return (
        df
        .withColumn("is_valid_trip",
                    (col("trip_distance").isNotNull()) &
                    (col("trip_distance") > 0) &
                    (col("trip_duration_seconds").isNotNull()) &
                    (col("trip_duration_seconds") > 0) &
                    (col("fare_amount").isNotNull()) &
                    (col("fare_amount") > 0) 
                    )
        .withColumn("source", lit(source))
    )

def enrich_silver_for_fhv(df):
    return (
        df
        .withColumn("is_valid_trip",
                    (col("trip_duration_seconds").isNotNull()) &
                    (col("trip_duration_seconds") > 0)
                    )
        .withColumn("source", lit("fhv"))
    )

In [0]:
silver_taxi = enrich_silver(silver_taxi, "taxi")
silver_fhv = enrich_silver_for_fhv(silver_fhv)
silver_fhvhv = enrich_silver(silver_fhvhv, "fhvhv")

In [0]:
# Filter on is valid trip
silver_taxi = silver_taxi.filter(silver_taxi["is_valid_trip"] == True).drop("is_valid_trip")
silver_fhv = silver_fhv.filter(silver_fhv["is_valid_trip"] == True).drop("is_valid_trip")
silver_fhvhv = silver_fhvhv.filter(silver_fhvhv["is_valid_trip"] == True).drop("is_valid_trip")

In [0]:
silver_taxi.show(5)
silver_fhv.show(5)
silver_fhvhv.show(3)

+---------+-------------------+-------------------+---------------+-------------+------------------+-------------------+---------+------------+-----------+-----+-------+----------+------------+---------------------+--------------------+-----------+------------+---------+------------------+---------+---------+--------------------+---------------------+------+
|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|trip_distance|pickup_location_id|dropoff_location_id|rate_code|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|congestion_surcharge|airport_fee|total_amount|trip_type|cbd_congestion_fee|ehail_fee|taxi_type|             trip_id|trip_duration_seconds|source|
+---------+-------------------+-------------------+---------------+-------------+------------------+-------------------+---------+------------+-----------+-----+-------+----------+------------+---------------------+--------------------+-----------+------------+---------+-------

In [0]:
# # write to silver
# silver_taxi.write.format("delta").mode("overwrite").save(f"{SILVER_BASE}/silver_taxi")
# silver_fhv.write.format("delta").mode("overwrite").save(f"{SILVER_BASE}/silver_fhv")
# silver_fhvhv.write.format("delta").mode("overwrite").save(f"{SILVER_BASE}/silver_fhvhv")

**Gold**

In [0]:
from pyspark.sql.functions import col, year, month, dayofmonth, hour, dayofweek, weekofyear, quarter, to_date, lit, monotonically_increasing_id

# read from silver
silver_taxi_df = spark.read.format("delta").load(f"{SILVER_BASE}/silver_taxi")
silver_fhv_df = spark.read.format("delta").load(f"{SILVER_BASE}/silver_fhv")
silver_fhvhv_df = spark.read.format("delta").load(f"{SILVER_BASE}/silver_fhvhv")

In [0]:
calendar_df = (
    silver_taxi_df.select("pickup_datetime")
    .union(silver_fhv_df.select("pickup_datetime"))
    .union(silver_fhvhv_df.select("pickup_datetime"))
    .distinct()
    .withColumn("date", to_date(col("pickup_datetime")))
    .withColumn("year", year(col("pickup_datetime")))
    .withColumn("month", month(col("pickup_datetime")))
    .withColumn("day", dayofmonth(col("pickup_datetime")))
    .withColumn("hour", hour(col("pickup_datetime")))
    .withColumn("weekday", dayofweek(col("pickup_datetime")))
    .withColumn("week_of_year", weekofyear(col("pickup_datetime")))
    .withColumn("quarter", quarter(col("pickup_datetime")))
    .drop("pickup_datetime")
    .distinct()
    .orderBy(["date", "hour"], ascending=[True, True])
    .withColumn("calendar_id", monotonically_increasing_id())
)
# calendar_df.show(5)

# calendar_df.write.format("delta").mode("overwrite").save(f"{GOLD_BASE}/dim_calendar")

+----------+----+-----+---+----+-------+------------+-------+-----------+
|      date|year|month|day|hour|weekday|week_of_year|quarter|calendar_id|
+----------+----+-----+---+----+-------+------------+-------+-----------+
|2001-01-01|2001|    1|  1|   0|      2|           1|      1|          0|
|2001-01-01|2001|    1|  1|   1|      2|           1|      1|          1|
|2001-01-01|2001|    1|  1|  15|      2|           1|      1|          2|
|2001-08-23|2001|    8| 23|   5|      5|          34|      3|          3|
|2002-10-21|2002|   10| 21|   0|      2|          43|      4|          4|
+----------+----+-----+---+----+-------+------------+-------+-----------+
only showing top 5 rows


In [0]:
dim_calendar = spark.read.format("delta").load(f"{GOLD_BASE}/dim_calendar")
display(dim_calendar.filter(dim_calendar["year"] < 2020))

calendar_id,date,year,month,day,hour,weekday,week_of_year,quarter
0,2001-01-01,2001,1,1,0,2,1,1
1,2001-01-01,2001,1,1,1,2,1,1
2,2001-01-01,2001,1,1,15,2,1,1
3,2001-08-23,2001,8,23,5,5,34,3
4,2002-10-21,2002,10,21,0,2,43,4
5,2002-10-21,2002,10,21,1,2,43,4
6,2002-10-21,2002,10,21,5,2,43,4
7,2002-10-21,2002,10,21,8,2,43,4
8,2002-10-21,2002,10,21,9,2,43,4
9,2002-10-21,2002,10,21,10,2,43,4


In [0]:
zones_df = spark.read.csv("/Volumes/workspace/default/nyc_taxi/reference/taxi_zone_lookup.csv", header=True, inferSchema=True)
zones_df = (
    zones_df 
    .withColumnRenamed("LocationID", "location_id")
    .withColumnRenamed("Borough", "borough")
    .withColumnRenamed("Zone", "zone")
)
# zones_df.display()

# zones_df.write.format("delta").mode("overwrite").save(f"{GOLD_BASE}/dim_location")

In [0]:
dim_location = spark.read.format("delta").load(f"{GOLD_BASE}/dim_location")
dim_location.show(5)

+-----------+-------------+--------------------+------------+
|location_id|      borough|                zone|service_zone|
+-----------+-------------+--------------------+------------+
|          1|          EWR|      Newark Airport|         EWR|
|          2|       Queens|         Jamaica Bay|   Boro Zone|
|          3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|          4|    Manhattan|       Alphabet City| Yellow Zone|
|          5|Staten Island|       Arden Heights|   Boro Zone|
+-----------+-------------+--------------------+------------+
only showing top 5 rows


In [0]:
from pyspark.sql.functions import lit

vendor_taxi = (
    silver_taxi_df
    .select(col("vendor_id").cast("string").alias("vendor_code_key"))
    .distinct()
    .withColumn("trip_type", lit("taxi"))
)

vendor_fhv = (
    silver_fhv_df
    .select(col("dispatching_base_num").alias("vendor_code_key"))
    .distinct()
    .withColumn("trip_type", lit("fhv"))
)

vendor_fhvhv = (
    silver_fhvhv_df
    .select(col("hvfhs_license_num").alias("vendor_code_key"))
    .distinct()
    .withColumn("trip_type", lit("fhvhv"))
)

vendor_df = vendor_taxi.unionByName(vendor_fhv).unionByName(vendor_fhvhv)
vendor_df = vendor_df.withColumn("vendor_id", monotonically_increasing_id())
vendor_df.display()

vendor_df.write.format("delta").mode("overwrite").save(f"{GOLD_BASE}/dim_vendor")

vendor_code_key,trip_type,vendor_id
2,taxi,0
6,taxi,1
5,taxi,2
1,taxi,3
B01663,fhv,4
B00608,fhv,5
B03417,fhv,6
B02852,fhv,7
B02770,fhv,8
B03309,fhv,9


In [0]:
dim_vendor = spark.read.format("delta").load(f"{GOLD_BASE}/dim_vendor")
dim_vendor.orderBy("vendor_id").display()

vendor_code_key,trip_type,vendor_id
2,taxi,0
6,taxi,1
5,taxi,2
1,taxi,3
B01663,fhv,8589934592
B00608,fhv,8589934593
B03417,fhv,8589934594
B02852,fhv,8589934595
B02770,fhv,8589934596
B03309,fhv,8589934597
