# NYC Taxi data DLT Pipeline

In [None]:
import dlt
from pyspark.sql.functions import *
from pyspark.sql.window import Window


## Bronze Layer

In [None]:
# Bronze table: Raw NYC taxi data ingestion
@dlt.table(
  comment="Raw NYC taxi data",
  table_properties={"quality": "bronze"}
)
def bronze_nyc_taxi():
  return (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("header", "true")
    .load("abfss://nyctaxi@ankurdataenggstorage.dfs.core.windows.net/nyc_taxi_data/")
  )


## Silver Layer

In [None]:
# Silver table: Cleaned and transformed NYC taxi data
@dlt.table(
  comment="Cleaned NYC taxi data",
  table_properties={"quality": "silver"}
)
def silver_nyc_taxi():
  bronze_df = dlt.read_stream("bronze_nyc_taxi")
  silver_df = bronze_df \
    .withColumn("tpep_pickup_datetime", to_timestamp("tpep_pickup_datetime")) \
    .withColumn("tpep_dropoff_datetime", to_timestamp("tpep_dropoff_datetime")) \
    .withColumn("passenger_count", col("passenger_count").cast("integer")) \
    .withColumn("trip_distance", col("trip_distance").cast("double")) \
    .withColumn("pickup_longitude", col("pickup_longitude").cast("double")) \
    .withColumn("pickup_latitude", col("pickup_latitude").cast("double")) \
    .withColumn("dropoff_longitude", col("dropoff_longitude").cast("double")) \
    .withColumn("dropoff_latitude", col("dropoff_latitude").cast("double")) \
    .withColumn("fare_amount", col("fare_amount").cast("double")) \
    .withColumn("total_amount", col("total_amount").cast("double")) \
    .withColumn("pickup_date", to_date("tpep_pickup_datetime")) \
    .withColumn("pickup_hour", hour("tpep_pickup_datetime")) \
    .filter(col("tpep_pickup_datetime").isNotNull() & col("tpep_dropoff_datetime").isNotNull())
  return silver_df


## Gold Layer

# Gold dimension table: Date dimension
@dlt.table(
  comment="Date dimension",
  table_properties={"quality": "gold"}
)
def dim_date():
  start_date = "2015-01-01"
  end_date = "2015-12-31"
  dates = spark.range(0, (to_date(lit(end_date)) - to_date(lit(start_date))).days + 1).withColumn("date", expr(f"date_add('{start_date}', id)"))
  dim_date_df = dates \
    .withColumn("date_key", date_format("date", "yyyyMMdd").cast("int")) \
    .withColumn("year", year("date")) \
    .withColumn("month", month("date")) \
    .withColumn("day", dayofmonth("date")) \
    .withColumn("day_of_week", dayofweek("date"))
  return dim_date_df

In [None]:
# Gold dimension table: Payment type dimension
@dlt.table(
  comment="Payment type dimension",
  table_properties={"quality": "gold"}
)
def dim_paymenttype():
  silver_df = spark.read.format("delta").table("live.silver_nyc_taxi")
  dim_paymenttype_df = silver_df.select("payment_type").distinct() \
    .withColumn("paymenttype_description", lit("Unknown")) \
    .withColumn("paymenttype_key", row_number().over(Window.orderBy("payment_type")))
  return dim_paymenttype_df


In [None]:
# Gold dimension table: Ratecode dimension
@dlt.table(
  comment="Ratecode dimension",
  table_properties={"quality": "gold"}
)
def dim_ratecode():
  silver_df = spark.read.format("delta").table("live.silver_nyc_taxi")
  dim_ratecode_df = silver_df.select("RateCodeID").distinct() \
    .withColumn("ratecode_description", lit("Unknown")) \
    .withColumn("ratecode_key", row_number().over(Window.orderBy("RateCodeID")))
  return dim_ratecode_df


In [None]:
# Gold dimension table: Vendor dimension
@dlt.table(
  comment="Vendor dimension",
  table_properties={"quality": "gold"}
)
def dim_vendor():
  silver_df = spark.read.format("delta").table("live.silver_nyc_taxi")
  dim_vendor_df = silver_df.select("VendorID").distinct() \
    .withColumn("vendor_name", concat(lit("Vendor "), col("VendorID"))) \
    .withColumn("vendor_key", row_number().over(Window.orderBy("VendorID")))
  return dim_vendor_df


In [None]:
# Gold fact table: Trip fact table
@dlt.table(
  comment="Trip fact table",
  table_properties={"quality": "gold"}
)
def fact_trip():
  silver_df = dlt.read_stream("silver_nyc_taxi")
  dim_vendor_df = dlt.read("dim_vendor")
  dim_ratecode_df = dlt.read("dim_ratecode")
  dim_paymenttype_df = dlt.read("dim_paymenttype")
  
  fact_df = silver_df \
    .withColumn("date_key", date_format("tpep_pickup_datetime", "yyyyMMdd").cast("int")) \
    .join(broadcast(dim_vendor_df), silver_df["VendorID"] == dim_vendor_df["VendorID"], "left") \
    .join(broadcast(dim_ratecode_df), silver_df["RateCodeID"] == dim_ratecode_df["RateCodeID"], "left") \
    .join(broadcast(dim_paymenttype_df), silver_df["payment_type"] == dim_paymenttype_df["payment_type"], "left") \
    .select(
      dim_vendor_df["vendor_key"].alias("vendor_key"),
      "date_key",
      dim_ratecode_df["ratecode_key"].alias("ratecode_key"),
      dim_paymenttype_df["paymenttype_key"].alias("paymenttype_key"),
      "tpep_pickup_datetime",
      "tpep_dropoff_datetime",
      "passenger_count",
      "trip_distance",
      "fare_amount",
      "total_amount",
      "pickup_longitude",
      "pickup_latitude",
      "dropoff_longitude",
      "dropoff_latitude"
    )
  return fact_df