Bronze Layer

In [0]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import to_date, col, udf, from_unixtime, split, date_format, unix_timestamp

spark = SparkSession.builder.appName("NYC_Trip_ETL").getOrCreate()

trip_df = spark.read.parquet("/Volumes/workspace/nyc_trips_etl/bronze_layer/yellow_tripdata.parquet", inferSchema=True, header=True)
weather_df = spark.read.csv('/Volumes/workspace/nyc_trips_etl/bronze_layer/weather.csv', inferSchema=True, header=True)


Silver Layer

In [0]:
trip_df = trip_df.drop('store_and_fwd_flag', "Airport_fee", "DOLocationID", "PULocationID", "RatecodeID", "congestion_surcharge", "extra", "mta_tax", "payment_type", "tolls_amount", "improvement_surcharge", "congestion", "fare_amount", "tip_amount")
weather_df = weather_df.drop("DEWP", "FRSHTT", "MXSPD", "SLP", "SNDP", "STP", "VISIB", "WDSP", "GUST", "STATION")


In [0]:
trip_df = (
    trip_df
    .na.drop()
    .filter(trip_df.passenger_count > 0)
)

weather_dfd = (
    weather_df
    .na.drop()   
)

In [0]:
trip_df = (
    trip_df
    .withColumn(
    'trip_duartion', 
    ((unix_timestamp('tpep_dropoff_datetime') - unix_timestamp('tpep_pickup_datetime')) / 60).cast('int')
    ) 
    .withColumn('pickup_date',date_format(col("tpep_pickup_datetime"), "yyyy-MM-dd"))
    .drop('tpep_pickup_datetime')
    .drop('tpep_dropoff_datetime')
)

weather_df.write.format("delta").mode("overwrite").save("/Volumes/workspace/nyc_trips_etl/silver_layer/weather_cleaned")
trip_df.write.format("delta").mode("overwrite").option('mergeSchema', 'true').save("/Volumes/workspace/nyc_trips_etl/silver_layer/trip_cleaned")

Gold Layer

In [0]:
weather_cleaned_df = spark.read.format("delta").load("/Volumes/workspace/nyc_trips_etl/silver_layer/weather_cleaned")
trip_cleaned_df = spark.read.format("delta").load("/Volumes/workspace/nyc_trips_etl/silver_layer/trip_cleaned")
display(weather_cleaned_df.limit(5))
display(trip_cleaned_df.limit(5))

STATION,DATE,MAX,MIN,PRCP,TEMP
,2023-01-01,55.0,50.0,0.28,52.4
,2023-01-02,55.9,48.9,0.0,52.0
,2023-01-03,57.0,46.4,0.04,51.6
,2023-01-04,66.0,46.9,0.4,59.1
,2023-01-05,66.0,46.9,0.03,49.0


VendorID,passenger_count,trip_distance,total_amount,trip_duartion,pickup_date
2,1,1.72,22.7,19,2024-01-01
1,1,1.8,18.75,6,2024-01-01
1,1,4.7,31.3,17,2024-01-01
1,1,1.4,17.0,8,2024-01-01
1,1,0.8,16.1,6,2024-01-01


In [0]:
from pyspark.sql import functions as F

merged_df = weather_cleaned_df.join(trip_cleaned_df, 
                                    trip_cleaned_df.pickup_date == weather_cleaned_df.DATE, 'inner')
merged_df = merged_df.groupBy('pickup_date', 'PRCP') \
    .agg(
        F.count('*').alias('trip_count'),
        F.avg('trip_duartion').alias('avg_duration').cast('int'),
        F.avg('total_amount').alias('avg_fare').cast('int')
    )
display(merged_df.limit(5))
merged_df.write.format("delta").mode("overwrite").option('mergeSchema', 'true').save("/Volumes/workspace/nyc_trips_etl/gold_layer/taxi_weather_delta")

pickup_date,PRCP,trip_count,avg_duration,avg_fare
2024-01-24,0.06,100376,16,26
2024-01-02,0.02,72668,16,30
2024-01-22,0.0,81687,15,27
2024-01-03,0.0,79670,16,28
2024-01-20,0.04,99194,13,23


In [0]:
# Example: Compare fares on rainy vs. clear days
display(merged_df.groupBy("PRCP").agg(avg("avg_fare").alias("mean_fare")))


PRCP,mean_fare
1.95,26.0
0.0,26.33333333333333
0.11,27.0
0.13,28.0
0.54,28.0
0.03,24.0
0.81,26.0
0.04,23.0
0.05,26.0
0.01,30.0
