# Exploratory Data Analysis with Pyspark and Spark SQL

The following notebook utilizes New York City taxi data from [TLC Trip Record Data](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page)

## Instructions

- Load and explore nyc taxi data from january 0f 2019. The exercises can be executed using pyspark or spark sql ( a subset of the questions will be re-answered using the language not chosen for the  main work).
- Load the zone lookup table to answer the questions about the nyc boroughs.  
- Load nyc taxi data from January of 2025 and compare data.  
- With any remaining time, work on the where to go from here section.  
- Lab due date is TBD ( due dates will be updated in the readme for the class repo )

In [0]:
# Define the name of the new catalog
catalog = 'taxi_eda_db'

# define variables for the trips data
schema = 'yellow_taxi_trips'
volume = 'data'
file_name = 'yellow_tripdata_2019-01.parquet'
table_name = 'tbl_yellow_taxi_trips'
path_volume = '/Volumes/' + catalog + "/" + schema + '/' + volume
path_table =  catalog + "." + schema
download_url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2019-01.parquet'


In [0]:
# create the catalog/schema/volume
spark.sql('create catalog if not exists ' + catalog)
spark.sql('create schema if not exists ' + catalog + '.' + schema)
spark.sql('create volume if not exists ' + catalog + '.' + schema + '.' + volume)

In [0]:
# Get the data
#dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")


## Lab

### Part 1
This section can be completed either using pyspark commands or sql commands ( There will be a section after in which a self-chosen subset of the questions are re-answered using the language not used for the main section. i.e. if pyspark is chosen for the main lab, sql should be used to repeat some of the questions. )

- Add a column that creates a unique key to identify each record in order to answer questions about individual trips
- Which trip has the highest passanger count
- What is the Average passanger count
- Shortest/longest trip by distance? by time?.
- busiest day/slowest single day
- busiest/slowest time of day ( you may want to bucket these by hour or create timess such as morning, afternoon, evening, late night )
- On average which day of the week is slowest/busiest
- Does trip distance or num passangers affect tip amount
- What was the highest "extra" charge and which trip
- Are there any datapoints that seem to be strange/outliers (make sure to explain your reasoning in a markdown cell)?

In [0]:
# create the dataframe
df_trips = spark.read.parquet(f"{path_volume}/{file_name}",
  header=True,
  inferSchema=True,
  sep=",")

In [0]:
display(df_trips.limit(10))

# add an id column 

In [0]:
df_trips.columns

 check row duplicate


In [0]:
from pyspark.sql.functions import count
# check if duplicates 
df_dupes = (
  df
  # df_trips
  .groupBy('id')
  # .groupBy(df_trips.columns)
  .agg(
    count("*").alias("row_count")
  )
  .filter("row_count > 1")
)

display(df_dupes)

add id column

In [0]:

from pyspark.sql.window import Window
import pyspark.sql.functions  as f 
#define one logical window by using a literal column that contain the value 1.  this is bad for performance
# window_spec = Window.orderBy(lit(1))
# df = df_trips.withColumn(
#     "id",
#    row_number().over(window_spec)
# )
# create id with the value monotonically increasing for each partintion, this is good in performance but the id is not in consecutive order
df = df_trips.withColumn(
    "id",
    f.monotonically_increasing_id()
)
display(df.limit(10))

each row is unique 

# find the trip with highest passengere count

In [0]:
max_passenger = df.agg(f.max("passenger_count")).collect()
display(df.filter(df.passenger_count == max_passenger[0][0]))

In [0]:
df.select(f.max("passenger_count")).collect()

#find average passenger_count

In [0]:

df.select(f.avg("passenger_count")).show()


#find the shortest/longest trip by distance and by time 

In [0]:
min_distince=  df.select(f.min("trip_distance")).collect()
display(df.filter(df.trip_distance == min_distince[0][0]))

max_distince=  df.select(f.max("trip_distance")).collect()
df.filter(df.trip_distance == max_distince[0][0]).display()

In [0]:
df.printSchema()

In [0]:
df = df.withColumn(
    "trip_duration_min",
    f.abs(
        f.unix_timestamp(f.col("tpep_dropoff_datetime")) -
        f.unix_timestamp(f.col("tpep_pickup_datetime"))
    )/60 )
df.limit(10).display()

In [0]:
df.select(f.min('trip_duration_min')).display()
df.select(f.max('trip_duration_min')).display()


there are some error in the data set that drop time is ealier than the pick up time as below

In [0]:
df.filter(df.tpep_dropoff_datetime<df.tpep_pickup_datetime).display()

#the busyiest day of the week

In [0]:
df_with_dow = df.select(
    f.dayofweek('tpep_pickup_datetime').alias('dayOfWeek')
)
df_with_dow.groupBy('dayOfWeek').agg(
    f.count('*').alias('num_trip')
).orderBy(
    'num_trip',
    ascending=False
).display()

 The busiest day of the week is on Thirday(day 5) and the slowest day is on sunday (day 1). 

#The busiest/slowest time of day

In [0]:
df = df.withColumn(
    'pickup_dayPart',
    f.when(
        (f.hour('tpep_pickup_datetime') >= 6) & (f.hour('tpep_pickup_datetime') < 12), 'morning'
    ).when(
        (f.hour('tpep_pickup_datetime') >= 12) & (f.hour('tpep_pickup_datetime') < 17), 'afternoon'
    ).when(
        (f.hour('tpep_pickup_datetime') >= 17) & (f.hour('tpep_pickup_datetime') < 21), 'evening'
    ).otherwise('night')
)
df.limit(10).display()

In [0]:
df.select('pickup_dayPart')\
    .groupBy('pickup_dayPart')\
    .agg(f.count('*').alias('num_trip'))\
    .orderBy('num_trip')\
    .display()

The busiest time of the day is in the afternoon. The slowest time of the day is at night. 

#On average which day of the week is slowest/busiest

In [0]:
#count the number of trip each day 
daily_trip = df.groupBy(f.to_date('tpep_pickup_datetime').alias('pickup_date'))\
                .agg(f.count('*').alias('num_trip_per_day'))\
                .orderBy('pickup_date')\
                .select('pickup_date',f.dayofweek('pickup_date').alias('dayOfWeek'),'num_trip_per_day')
daily_trip.display()


In [0]:
#average trip per day of the week
daily_trip.groupBy('dayOfWeek')\
            .agg(f.avg('num_trip_per_day')\
            .alias('avg_trip_per_day'))\
            .orderBy('avg_trip_per_day',ascending=False)\
            .display()

on average the busiest day of the week is on Thirday (day 5). and the slowest day of the week is Monday (day 2). 

#Does trip distance or num passangers affect tip amount

In [0]:
df.stat.corr('trip_distance','tip_amount')

In [0]:
df.stat.corr('passenger_count','tip_amount')

trip distance has more impact on the tip amount than the number of passangers. 

#What was the highest "extra" charge and which trip

In [0]:

df.filter(df.extra== df.select(f.max('extra')).collect()[0][0]).display()

In [0]:
df.filter(df.trip_distance==0).display()

In [0]:
display(df)

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

In [0]:
df.filter(
    (df.passenger_count == 0) & (df.trip_distance == 0)
).display()

there are trips that have 0 distance but the fare_amount is not 0

### Part 2

- Using the code for loading the first dataset as an example, load in the taxi zone lookup and answer the following questions
- which borough had most pickups? dropoffs?
- what are the busy/slow times by borough 
- what are the busiest days of the week by borough?
- what is the average trip distance by borough?
- what is the average trip fare by borough?
- highest/lowest faire amounts for a trip, what burough is associated with the each
- load the dataset from the most recently available january, is there a change to any of the average metrics.

### Part 3

- choose 3 questions from above and re-answer them using the language you did not use for the main notebook . (i.e - if you completed the exercise in python, redo 3 questions in pure sql) . at least one of the questions to be redone must involve a join


### Part 4

As of spark v4 dataframes have native visualization support. Choose at least 3 questions from above and provide visualizations.


# Where to go from here

- Continue building the dataset by loading in more data, start by completing the data for 2019 and calculating the busiest season (fall, winter, spring, summer)
- Explore a dataset/datasets of your choosing