# Exploratory Data Analysis with Pyspark and Spark SQL

The following notebook utilizes New York City taxi data from [TLC Trip Record Data](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page)

## Instructions

- Load and explore nyc taxi data from january 0f 2019. The exercises can be executed using pyspark or spark sql ( a subset of the questions will be re-answered using the language not chosen for the  main work).
- Load the zone lookup table to answer the questions about the nyc boroughs.  
- Load nyc taxi data from January of 2025 and compare data.  
- With any remaining time, work on the where to go from here section.  
- Lab due date is TBD ( due dates will be updated in the readme for the class repo )

In [0]:
# Define the name of the new catalog
catalog = 'taxi_eda_db'

# define variables for the trips data
schema = 'yellow_taxi_trips'
volume = 'data'
file_name = 'yellow_tripdata_2019-01.parquet'
table_name = 'tbl_yellow_taxi_trips'
path_volume = '/Volumes/' + catalog + "/" + schema + '/' + volume
path_table =  catalog + "." + schema
download_url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2019-01.parquet'

In [0]:
# create the catalog/schema/volume
spark.sql('create catalog if not exists ' + catalog)
spark.sql('create schema if not exists ' + catalog + '.' + schema)
spark.sql('create volume if not exists ' + catalog + '.' + schema + '.' + volume)

In [0]:
# Get the data
dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")

In [0]:
# create the dataframe
df_trips = spark.read.parquet(f"{path_volume}/{file_name}")

In [0]:
# Show the dataframe
df_trips.show(5)

## Lab

### Part 1
This section can be completed either using pyspark commands or sql commands ( There will be a section after in which a self-chosen subset of the questions are re-answered using the language not used for the main section. i.e. if pyspark is chosen for the main lab, sql should be used to repeat some of the questions. )

- Add a column that creates a unique key to identify each record in order to answer questions about individual trips
- Which trip has the highest passanger count
- What is the Average passanger count
- Shortest/longest trip by distance? by time?
- highest/lowest faire amounts for a trip, what burough is associated with the each.
- busiest day/slowest single day
- busiest/slowest time of day ( you may want to bucket these by hour or create timess such as morning, afternoon, evening, late night )
- On average which day of the week is slowest/busiest
- Does trip distance or num passangers affect tip amount
- What was the highest "extra" charge and which trip
- Are there any datapoints that seem to be strange/outliers (make sure to explain your reasoning in a markdown cell)?

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

df_with_id = df_trips.withColumn("RecordID", monotonically_increasing_id())   
df_with_id.show(5)

In [0]:
from pyspark.sql.functions import max, col

max_value = df_with_id.agg(max("passenger_count")).collect()[0][0]
max_pass_count = df_with_id.filter(col("passenger_count") == max_value)
max_pass_count.show()

In [0]:
from pyspark.sql.functions import avg, col

avg_value = df_with_id.agg(avg("passenger_count")).collect()[0][0]
print("Average passenger count: ", avg_value)

In [0]:
# from pyspark.sql.functions import max, min, col

# df_trip_time = df_with_id.withColumn(
#     "trip_time",
#     col("tpep_dropoff_datetime") - col("tpep_pickup_datetime")
# )

# max_time = df_trip_time.agg(max("trip_time")).collect()[0][0]
# min_time = df_trip_time.agg(max("trip_time")).collect()[0][0]
# max_distance = df_with_id.agg(max("trip_distance")).collect()[0][0]
# min_distance = df_with_id.agg(min("trip_distance")).collect()[0][0]

# max_distance_df = df_with_id.filter(col("trip_distance") == max_distance)
# min_distance_df = df_with_id.filter(col("trip_distance") == min_distance)
# max_time_df = df_trip_time.filter(col("trip_time") == max_time)
# min_time_df = df_trip_time.filter(col("trip_time") == min_time)

In [0]:
catalog = "taxi_eda_db"
schema = "yellow_taxi_trips"
volume = "data"
file_name = "taxi_zone_lookup.csv"
table_name = "taxi_zone_lookup"
download_url = "https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv"

spark.sql('create catalog if not exists ' + catalog)
spark.sql('create schema if not exists ' + catalog + '.' + schema)
spark.sql('create volume if not exists ' + catalog + '.' + schema + '.' + volume)

path_volume = f"/Volumes/{catalog}/{schema}/{volume}"
dbutils.fs.cp(download_url, f"{path_volume}/{file_name}")

df_zones = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv(f"{path_volume}/{file_name}")
)

df_zones.show(5)


In [0]:
from pyspark.sql.functions import max, min, col

max_fare_amount = df_with_id.agg(max("fare_amount")).collect()[0][0]
min_fare_amount = df_with_id.agg(min("fare_amount")).collect()[0][0]

max_fare_amount_df = df_with_id.filter(col("fare_amount") == max_fare_amount).select("fare_amount", "DOLocationID")
min_fare_amount_df = df_with_id.filter(col("fare_amount") == min_fare_amount).select("fare_amount", "DOLocationID")

df_max_joined = max_fare_amount_df.join(df_zones, max_fare_amount_df.DOLocationID == df_zones.LocationID, how="inner")
df_min_joined = min_fare_amount_df.join(df_zones, min_fare_amount_df.DOLocationID == df_zones.LocationID, how="inner")

df_max_joined.show(5)
df_min_joined.show(5)

In [0]:
from pyspark.sql.functions import to_date

df_dated = df_with_id.withColumn("pickup_date", to_date("tpep_pickup_datetime"))
df_business_counts = df_dated.groupBy("pickup_date").count()

max_day_business = df_business_counts.agg(max("count")).collect()[0][0]
min_day_business = df_business_counts.agg(min("count")).collect()[0][0]

max_day_business_df = df_business_counts.filter(col("count") == max_day_business)
min_day_business_df = df_business_counts.filter(col("count") == min_day_business)

max_day_business_df.show(5)
min_day_business_df.show(5)

In [0]:
from pyspark.sql.functions import date_format

df_day_of_week = df_dated.withColumn("day_of_week", date_format("pickup_date", "EEEE"))
df_day_of_week_counts = df_day_of_week.groupBy("day_of_week").count()

max_dow_business = df_day_of_week_counts.agg(max("count")).collect()[0][0]
min_dow_business = df_day_of_week_counts.agg(min("count")).collect()[0][0]

max_dow_business_df = df_day_of_week_counts.filter(col("count") == max_dow_business)
min_dow_business_df = df_day_of_week_counts.filter(col("count") == min_dow_business)

max_dow_business_df.show(5)
min_dow_business_df.show(5)

In [0]:
from pyspark.sql.functions import corr

df_with_id.select(
    corr("trip_distance", "tip_amount").alias("corr_distance_tip"),
    corr("passenger_count", "tip_amount").alias("corr_passengers_tip")
).show()

In [0]:
from pyspark.sql.functions import max, col

max_extra_amount = df_with_id.agg(max("extra")).collect()[0][0]
print(f"Max extra amount for a trip: {max_extra_amount}")
max_extra_amount_df = df_with_id.filter(col("extra") == max_extra_amount)
max_extra_amount_df.show(5)

The following outliers detection method is using the well known Z-Score method. It is a strong method but it assumes a normal distribution in the data...

In [0]:
from pyspark.sql import functions as F

def z_score_outliers(df, col_name):
    stats = df.agg(
        F.mean(col_name).alias("mean"),
        F.stddev(col_name).alias("std")
    ).collect()[0]
    mean = stats["mean"]
    std = stats["std"]
    df_z_outliers = df_with_id.filter(
        (F.col(col_name) - mean) / std > 2
    )
    return df_z_outliers

df_z_outliers_dist = z_score_outliers(df_with_id, "trip_distance")
print("Trip distance outliers: ")
df_z_outliers_dist.show(5)

df_z_outliers_pass = z_score_outliers(df_with_id, "passenger_count")
print("Passengers count outliers: ")
df_z_outliers_pass.show(5)

df_z_outliers_tot = z_score_outliers(df_with_id, "total_amount")
print("Total amount outliers: ")
df_z_outliers_tot.show(5)

### Part 2

- Using the code for loading the first dataset as an example, load in the taxi zone lookup and answer the following questions
- which borough had most pickups? dropoffs?
- what are the busy/slow times by borough 
- what are the busiest days of the week by borough?
- what is the average trip distance by borough?
- what is the average trip fare by borough?
- load the dataset from the most recently available january, is there a change to any of the average metrics.

In [0]:
from pyspark.sql.functions import broadcast, col

zones = broadcast(df_zones.select("LocationID", "Borough"))

df_trips_zones = (
    df_trips
    .join(
        zones.select(
            col("LocationID").alias("PULocationID"),
            col("Borough").alias("PickupBorough")
        ),
        on="PULocationID",
        how="left"
    )
    .join(
        zones.select(
            col("LocationID").alias("DOLocationID"),
            col("Borough").alias("DropoffBorough")
        ),
        on="DOLocationID",
        how="left"
    )
)

df_final = df_trips_zones.drop("PULocationID", "DOLocationID")

pickup_counts = (
    df_final.groupBy("PickupBorough")
            .count()
            .orderBy(col("count").desc())
)
pickup_counts.show(1)

dropoff_counts = (
    df_final.groupBy("DropoffBorough")
            .count()
            .orderBy(col("count").desc())
)
dropoff_counts.show(1)

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import hour, col, row_number

df_timed = (
    df_final
    .withColumn("pickup_hour",  hour("tpep_pickup_datetime")) 
    .withColumn("dropoff_hour", hour("tpep_dropoff_datetime"))
)

pickup_activity = (
    df_timed
    .groupBy("PickupBorough", "pickup_hour")
    .count()
    .orderBy("PickupBorough", col("count").desc())
)
dropoff_activity = (
    df_timed
    .groupBy("DropoffBorough", "dropoff_hour")
    .count()
    .orderBy("DropoffBorough", col("count").desc())
)

w1 = Window.partitionBy("PickupBorough").orderBy(col("count").desc())
busiest_pickups = (
    pickup_activity
    .withColumn("rank", row_number().over(w1))
    .filter("rank = 1")
    .select("PickupBorough", "pickup_hour", "count")
)

w2 = Window.partitionBy("PickupBorough").orderBy(col("count").asc())
slowest_pickups = (
    pickup_activity
    .withColumn("rank", row_number().over(w2))
    .filter("rank = 1")
    .select("PickupBorough", "pickup_hour", "count")
)

busiest_pickups.show()
slowest_pickups.show()

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import date_format, col, row_number

df_days = df_final.withColumn("pickup_day", date_format("tpep_pickup_datetime", "EEEE"))
pickup_by_day = (
    df_days
    .groupBy("PickupBorough", "pickup_day")
    .count()
)

w = Window.partitionBy("PickupBorough").orderBy(col("count").desc())
busiest_pickup_days = (
    pickup_by_day
    .withColumn("rank", row_number().over(w))
    .filter("rank = 1")
    .select("PickupBorough", "pickup_day", "count")
)

busiest_pickup_days.show()

In [0]:
from pyspark.sql.functions import avg, col

avg_distance_pickup = (
    df_final
    .groupBy("PickupBorough")
    .agg(avg("trip_distance").alias("avg_trip_distance"))
    .orderBy(col("avg_trip_distance").desc())
)

avg_distance_pickup.show()

In [0]:
from pyspark.sql.functions import avg, col

avg_trip_fare = (
    df_final
    .groupBy("PickupBorough")
    .agg(avg("fare_amount").alias("avg_trip_fare"))
    .orderBy(col("avg_trip_fare").desc())
)

avg_trip_fare.show()

In [0]:
catalog = "taxi_eda_db"
schema = "yellow_taxi_trips"
volume = "data"
file_name = "yellow_tripdata_2025-01.parquet"
table_name = "tbl_yellow_taxi_trips_recent"
download_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet"

spark.sql('create catalog if not exists ' + catalog)
spark.sql('create schema if not exists ' + catalog + '.' + schema)
spark.sql('create volume if not exists ' + catalog + '.' + schema + '.' + volume)

path_volume = f"/Volumes/{catalog}/{schema}/{volume}"
dbutils.fs.cp(download_url, f"{path_volume}/{file_name}")

df_taxi_trips_recent = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .parquet(f"{path_volume}/{file_name}")
)

df_taxi_trips_recent.show(5)


In [0]:
from pyspark.sql.functions import avg, lit

def compute_metrics(df, label):
    return (
        df.agg(
            avg("trip_distance").alias("avg_distance"),
            avg("fare_amount").alias("avg_fare"),
            avg("passenger_count").alias("avg_passengers")
        ).withColumn("dataset", lit(label))
    )

metrics_old = compute_metrics(df_trips, "previous")
metrics_new = compute_metrics(df_taxi_trips_recent, "recent")

comparison = metrics_old.unionByName(metrics_new)
comparison.show()

### Part 3

- choose 3 questions from above and re-answer them using the language you did not use for the main notebook . (i.e - if you completed the exercise in python, redo 3 questions in pure sql) . at least one of the questions to be redone must involve a join

In [0]:
df_trips.createOrReplaceTempView("trips")
df_zones.createOrReplaceTempView("zones")

In [0]:
%sql
SELECT
  tpep_pickup_datetime,
  tpep_dropoff_datetime,
  timestampdiff(SECOND, tpep_pickup_datetime, tpep_dropoff_datetime) AS trip_seconds
FROM trips;

In [0]:
%sql
SELECT
  z.Borough AS PickupBorough,
  AVG(t.trip_distance) AS avg_trip_distance
FROM trips t
JOIN zones z 
ON t.PULocationID = z.LocationID
GROUP BY z.Borough
ORDER BY avg_trip_distance DESC;

In [0]:
%sql
SELECT
  z.Borough AS PickupBorough,
  AVG(t.fare_amount) AS avg_fare_amount
FROM trips t
JOIN zones z 
ON t.PULocationID = z.LocationID
GROUP BY z.Borough
ORDER BY avg_fare_amount DESC;


### Part 4

As of spark v4 dataframes have native visualization support. Choose at least 3 questions from above and provide visualizations.


In [0]:
display(busiest_pickups)

In [0]:
display(avg_trip_fare)

In [0]:
display(comparison)

# Where to go from here

- Continue building the dataset by loading in more data, start by completing the data for 2019 and calculating the busiest season (fall, winter, spring, summer)
- Explore a dataset/datasets of your choosing