In [0]:
%sql
USE CATALOG nyc_taxi;
USE SCHEMA nyc_taxi_schema;

In [0]:
spark.sql("USE CATALOG nyc_taxi")
spark.sql("USE SCHEMA nyc_taxi_schema")


df = spark.table("nyc_taxi.nyc_taxi_schema.yellow_trips_csv_v")

In [0]:
df.printSchema()
display(df)

In [0]:
num_cols = ["trip_distance","passenger_count","fare_amount","tip_amount","tolls_amount","mta_tax","extra","improvement_surcharge","total_amount"]
display(df.select(*num_cols).summary())


In [0]:

clean = df.na.drop(subset=['fare_amount', 'trip_distance', 'passenger_count'])

clean = clean.filter((clean.fare_amount >=0)&(clean.trip_distance >=0)&(clean.passenger_count >0)& (clean.tpep_pickup_datetime < clean.tpep_dropoff_datetime)& (clean.pickup_longitude !=0) & (clean.pickup_latitude !=0) & (clean.dropoff_longitude !=0) & (clean.dropoff_latitude !=0))
print(clean.count())
     

In [0]:
from pyspark.sql import functions as F
clean = clean.withColumn("trip_duration (m)", F.round((F.col("tpep_dropoff_datetime").cast("long") -F.col("tpep_pickup_datetime").cast("long"))/60,2))
clean = clean.withColumn("trip_speed(mph)", F.round((F.col("trip_distance")/(F.col("trip_duration (m)")/60)),2))
display(clean)

In [0]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession

psdf = clean.pandas_api()
psdf = psdf.groupby('passenger_count')[['fare_amount','trip_distance']].mean().reset_index()
display(psdf)

In [0]:
by_hour = clean.withColumn("pickup_hour", F.hour(F.col("tpep_pickup_datetime"))).groupBy("pickup_hour").count().orderBy(F.desc("count"))
display(by_hour)

In [0]:
#round coords to 0.01 grids to make neighborhood
cells = clean.withColumn("pickup_lat_r",  F.round("pickup_latitude", 2)) \
             .withColumn("pickup_lon_r",  F.round("pickup_longitude", 2))

avg_fare_by_cell = cells.groupBy("pickup_lat_r","pickup_lon_r") \
                        .agg(F.avg("fare_amount").alias("avg_fare"), F.count("*").alias("n")) \
                        .filter("n >= 200") \
                        .orderBy(F.desc("avg_fare"))
display(avg_fare_by_cell)


In [0]:
#6a still needs fixing
import matplotlib.pyplot as plt
import seaborn as sns

bin_count = 50
trip_distances = clean.select("trip_distance").toPandas()["trip_distance"]

plt.figure(figsize=(9,5))
sns.histplot(trip_distances, bins=bin_count, kde=True)
plt.title("Trip Distance Distribution (miles)")
plt.xlabel("Trip distance (miles)")
plt.ylabel("Count")
plt.show()

In [0]:
fare_by_hour_sdf = (
    clean
    .withColumn("pickup_hour", F.hour(F.col("tpep_pickup_datetime")))
    .groupBy("pickup_hour")
    .agg(F.avg("fare_amount").alias("avg_fare"))
    .orderBy("pickup_hour")
)

fare_by_hour = fare_by_hour_sdf.toPandas()

plt.figure(figsize=(9,5))
plt.bar(fare_by_hour["pickup_hour"], fare_by_hour["avg_fare"])
plt.title("Average Fare by Hour of Day")
plt.xlabel("Hour of day (0–23)")
plt.ylabel("Average fare (USD)")
plt.xticks(range(0,24))
plt.tight_layout()
