In [0]:
from pyspark.sql.functions import col

weather_df = spark.table('`nyc_taxis_weather`.bronze_nyc_taxis.bronze_weather_oct_dec_2024').drop(col("_rescued_data"))
taxis_df = spark.table('`nyc_taxis_weather`.bronze_nyc_taxis.bronze_nyc_taxis_oct_dec_2024').drop(col("_rescued_data"))
lookup_df = spark.table('`nyc_taxis_weather`.bronze_nyc_taxis.bronze_taxi_zone_lookup').drop(col("_rescued_data"))
#weather codes .csv table uploaded manually (reference: https://dev.meteostat.net/formats.html#weather-condition-codes)
w_code_df = spark.table('`nyc_taxis_weather`.bronze_nyc_taxis.bronze_weather_codes').drop(col("_rescued_data"))

In [0]:
from pyspark import broadcast
from pyspark.sql.functions import col, date_trunc

#Using broadcast join because location table contains ~250 records so it is small enough to be cached and joined
# loc_dict = broadcast(lookup_df)
# weather_brd = broadcast(weather_df)

#Truncating pickup time to enable joining weather data (weather data available for full hours)
dt_taxis_hour = taxis_df.withColumn("pickup_hour", date_trunc("hour", col("tpep_pickup_datetime")))

#Using broadcast join because weather table contains ~720 records so it is small enough to be cached and joined
dt_taxis_weather = dt_taxis_hour.join(weather_df, dt_taxis_hour.pickup_hour == weather_df.time, "left").drop(col("datetime"), col("time"), col("pickup_hour"))

#Joining dictionary table to create final result table ready to be analysed
result_table = dt_taxis_weather.join(lookup_df.withColumnRenamed("LocationID", "PULocationID_tmp")
                                    .withColumnRenamed("Borough", "PUBorough").drop(col("service_zone"))
                                    .withColumnRenamed("Zone", "PUZone"),dt_taxis_weather.PULocationID == col("PULocationID_tmp"),"left").join(lookup_df.withColumnRenamed("LocationID", "DOLocationID_tmp")
                                    .withColumnRenamed("Borough", "DOBorough")
                                    .withColumnRenamed("Zone", "DOZone"),dt_taxis_weather.DOLocationID == col("DOLocationID_tmp"),"left").join(w_code_df,dt_taxis_weather.coco == col("w_code"), "left")
                                    
#adding fare duration and dropping unnecessary post-join coilumns                                   
result_table = result_table.withColumn("ride_duration", (col("tpep_dropoff_datetime").cast("long") - col("tpep_pickup_datetime").cast("long")) / 60).drop("PULocationID_tmp", "DOLocationID_tmp", "w_code", "coco")


In [0]:
#saving table with overwrite and mergeSchema which allows for schema evolution
result_table.write.mode("overwrite").option("mergeSchema", "true").saveAsTable("`nyc_taxis_weather`.silver_nyc_taxis.silver_nyc_taxis_weather_oct_dec")