# Data Reading

In [1]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

builder = (
    SparkSession.builder
        .appName("Silver")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [2]:
#importing modules
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [3]:
#Reading trip type CSV data

df_trip_type = spark.read\
                        .format("delta")\
                        .load(r"C:\Users\gauth\Desktop\Extended Medallion Architecture\storage_bronze\trip_type")


In [4]:
df_trip_type.show()

+---------+-----------+
|trip_type|description|
+---------+-----------+
|        1|Street-hail|
|        2|   Dispatch|
+---------+-----------+



In [5]:
#reading trip zone CSV data

df_trip_zone = spark.read\
                        .format("delta")\
                        .load(r"C:\Users\gauth\Desktop\Extended Medallion Architecture\storage_bronze\trip_zone")


In [6]:
df_trip_zone.show(truncate=False)

+----------+-------------+-----------------------+------------+
|LocationID|Borough      |Zone                   |service_zone|
+----------+-------------+-----------------------+------------+
|1         |EWR          |Newark Airport         |EWR         |
|2         |Queens       |Jamaica Bay            |Boro Zone   |
|3         |Bronx        |Allerton/Pelham Gardens|Boro Zone   |
|4         |Manhattan    |Alphabet City          |Yellow Zone |
|5         |Staten Island|Arden Heights          |Boro Zone   |
|6         |Staten Island|Arrochar/Fort Wadsworth|Boro Zone   |
|7         |Queens       |Astoria                |Boro Zone   |
|8         |Queens       |Astoria Park           |Boro Zone   |
|9         |Queens       |Auburndale             |Boro Zone   |
|10        |Queens       |Baisley Park           |Boro Zone   |
|11        |Brooklyn     |Bath Beach             |Boro Zone   |
|12        |Manhattan    |Battery Park           |Yellow Zone |
|13        |Manhattan    |Battery Park C

In [7]:
df_trip_data = spark.read\
                        .format("delta") \
                        .load(r"C:\Users\gauth\Desktop\Extended Medallion Architecture\storage_bronze\trip_data")

In [8]:
df_trip_data.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|cbd_congestion_fee|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+------------------+
|       2| 2025-03-01 00:07:34|  2025-03-01 00:24:52|                 N|         1|          75|         2

# Data Transformations

### Trip Type

In [9]:
# description column is renamed to trip_description

df_trip_type = df_trip_type.withColumnRenamed("description","trip_description")

In [10]:
df_trip_type.show()

+---------+----------------+
|trip_type|trip_description|
+---------+----------------+
|        1|     Street-hail|
|        2|        Dispatch|
+---------+----------------+



In [11]:
# writing trip type data to silver layer

df_trip_type.write.format("delta")\
                    .mode("overwrite")\
                    .option("path",r"C:\Users\gauth\Desktop\Extended Medallion Architecture\storage_silver\trip_type")\
                    .save()

### Trip Zone

In [12]:
df_trip_zone.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [13]:
''' 
There are two zones in the zone column in some of the records, 
so we are splitting the zone column into two columns. 
Records which have only one zone will have `null` in the second column.
'''
df_trip_zone = df_trip_zone \
    .withColumn("zone1", expr("get(split(Zone, '/'), 0)")) \
    .withColumn("zone2", expr("get(split(Zone, '/'), 1)"))


In [14]:
df_trip_zone = df_trip_zone.drop("Zone")

In [15]:
df_trip_zone.count()

265

In [16]:
df_trip_zone.show()

+----------+-------------+------------+-----------------+--------------+
|LocationID|      Borough|service_zone|            zone1|         zone2|
+----------+-------------+------------+-----------------+--------------+
|         1|          EWR|         EWR|   Newark Airport|          NULL|
|         2|       Queens|   Boro Zone|      Jamaica Bay|          NULL|
|         3|        Bronx|   Boro Zone|         Allerton|Pelham Gardens|
|         4|    Manhattan| Yellow Zone|    Alphabet City|          NULL|
|         5|Staten Island|   Boro Zone|    Arden Heights|          NULL|
|         6|Staten Island|   Boro Zone|         Arrochar|Fort Wadsworth|
|         7|       Queens|   Boro Zone|          Astoria|          NULL|
|         8|       Queens|   Boro Zone|     Astoria Park|          NULL|
|         9|       Queens|   Boro Zone|       Auburndale|          NULL|
|        10|       Queens|   Boro Zone|     Baisley Park|          NULL|
|        11|     Brooklyn|   Boro Zone|       Bath 

In [17]:
# write trip zone data to silver layer
df_trip_zone.write.format("delta")\
                    .mode("overwrite")\
                    .option("path",r"C:\Users\gauth\Desktop\Extended Medallion Architecture\storage_silver\trip_zone")\
                    .save()

### Trip Data

Surface Level Exploration

In [18]:
df_trip_data.count()

543139

In [19]:
df_trip_data.summary().show()

+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+---------+---------------------+------------------+-------------------+-------------------+--------------------+-------------------+
|summary|          VendorID|store_and_fwd_flag|        RatecodeID|      PULocationID|      DOLocationID|   passenger_count|     trip_distance|       fare_amount|             extra|            mta_tax|        tip_amount|      tolls_amount|ehail_fee|improvement_surcharge|      total_amount|       payment_type|          trip_type|congestion_surcharge| cbd_congestion_fee|
+-------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+---------+----------

In [20]:
df_trip_data.select([
    approx_count_distinct(c).alias(c)
    for c in df_trip_data.columns
]).show()


+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|cbd_congestion_fee|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+------------------+
|       3|              573377|               526856|                 2|         7|         255|         2

Column Standardization

In [21]:
df_trip_data = (
    df_trip_data
    .withColumnRenamed("lpep_pickup_datetime", "pickup_datetime")
    .withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")
)

Business Logic

In [22]:
df_trip_data = df_trip_data.filter(
    (col("fare_amount").between(3, 200)) &
    (col("total_amount") > 0) &
    (col("trip_distance").between(0.5, 100)) &
    (col("passenger_count").between(1, 6)) &
    (col("dropoff_datetime") > col("pickup_datetime"))
)

Feature Engineering

In [23]:
df_trip_data = df_trip_data.withColumn("pickup_date", to_date("pickup_datetime")) \
       .withColumn("pickup_hour", hour("pickup_datetime")) \
       .withColumn("pickup_day", dayofmonth("pickup_datetime")) \
       .withColumn("pickup_month", month("pickup_datetime")) \
       .withColumn("pickup_year", year("pickup_datetime"))

In [24]:
df_trip_data = df_trip_data.withColumn(
    "extra_charges",
    round(col("total_amount") - col("fare_amount"), 2)
)

In [25]:
df_trip_data = df_trip_data.withColumn(
    "trip_duration_seconds",
    col("dropoff_datetime").cast("long") -
    col("pickup_datetime").cast("long")
)

In [26]:
df_trip_data = (
    df_trip_data
    .filter(col("trip_duration_seconds") > 0)
    .withColumn(
        "trip_duration_minutes",
        round(col("trip_duration_seconds") / 60, 2)
    )
)

In [27]:
df_trip_data = df_trip_data.withColumn(
    "fare_per_mile",
    round(col("fare_amount") / col("trip_distance"), 2)
)

In [28]:
q_low = df_trip_data.approxQuantile("fare_per_mile", [0.01], 0.0)[0]
q_high = df_trip_data.approxQuantile("fare_per_mile", [0.99], 0.0)[0]

df_trip_data = df_trip_data.filter(
    (col("fare_per_mile") >= q_low) &
    (col("fare_per_mile") <= q_high)
)

Keeping only required columns

In [29]:
df_trip_data = df_trip_data.select(
    "VendorID",
    "pickup_datetime",
    "dropoff_datetime",
    "pickup_date",
    "pickup_hour",
    "pickup_day",
    "pickup_month",
    "pickup_year",
    "RatecodeID",
    "PULocationID",
    "DOLocationID",
    "passenger_count",
    "trip_type",
    "payment_type",
    "trip_distance",
    "fare_amount",
    "tip_amount",
    "extra_charges",
    "fare_per_mile",
    "trip_duration_minutes",
    "total_amount"
)

Validation

In [30]:
df_trip_data.show()

+--------+-------------------+-------------------+-----------+-----------+----------+------------+-----------+----------+------------+------------+---------------+---------+------------+-------------+-----------+----------+-------------+-------------+---------------------+------------+
|VendorID|    pickup_datetime|   dropoff_datetime|pickup_date|pickup_hour|pickup_day|pickup_month|pickup_year|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_type|payment_type|trip_distance|fare_amount|tip_amount|extra_charges|fare_per_mile|trip_duration_minutes|total_amount|
+--------+-------------------+-------------------+-----------+-----------+----------+------------+-----------+----------+------------+------------+---------------+---------+------------+-------------+-----------+----------+-------------+-------------+---------------------+------------+
|       2|2025-05-01 00:17:04|2025-05-01 00:56:06| 2025-05-01|          0|         1|           5|       2025|         1|          25|     

In [31]:
df_trip_data.filter(col("trip_duration_minutes") == 0).count()

0

In [32]:
df_trip_data.select("fare_per_mile").summary().show()

+-------+------------------+
|summary|     fare_per_mile|
+-------+------------------+
|  count|            443469|
|   mean| 7.033619373620358|
| stddev|1.8435615096780271|
|    min|              3.93|
|    25%|              5.74|
|    50%|              6.77|
|    75%|              8.02|
|    max|             14.91|
+-------+------------------+



Write

In [33]:

df_trip_data.write.format("delta")\
                  .mode("overwrite")\
                  .option("path",r"C:\Users\gauth\Desktop\Extended Medallion Architecture\storage_silver\trip_data")\
                  .save()
