In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, StringType, LongType

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

## Question 1: Install PySpark 
### What is the installed Spark version?
The version of Spark installed on this machine is 3.5.0. 

In [3]:
# Version check 
print(f"PySpark version: {spark.version}")

PySpark version: 3.5.0


In [4]:
# CSV file path 
csv_path = '../data/source/fhv_tripdata_2019-10.csv.gz'

# Read CSV
csv_df = spark.read \
    .option("header", "true") \
    .csv(csv_path)

# Repartition 
csv_df = csv_df.repartition(6)

# Save to disk 
csv_df.write \
    .format("parquet") \
    .mode("Overwrite") \
    .save('../data/output/csv_input/2019/10')


## Question 2: Repartition 
### Repartition the Spark Dataframe to 6 partitions and save it to parquet. 
After repartitioning the Dataframe, the resulting parquet files were roughly 6MB each. 

In [5]:
# Read in FHV 2019 data as parquet
file_path = '../data/source/fhv_tripdata_2019-10.parquet'
df_fhv = spark.read \
    .parquet(file_path)

# Repartition and save 
df_fhv = df_fhv.repartition(6)

# Confirm number of parititions 
df_fhv.rdd.getNumPartitions()

6

In [6]:
# Write partitioned dataframe to disk 
df_fhv.write \
    .format("parquet") \
    .mode("Overwrite") \
    .save('../data/output/2019/10/')

In [7]:
# Read in taxi data
taxi_path = '../data/source/taxi_zone_lookup.csv'

df_taxi = spark.read \
            .format('csv') \
            .option('header', 'true') \
            .load(taxi_path)

# Create temporary view for later 
df_taxi.createOrReplaceTempView("taxi_lookup")

In [8]:
# Create additional columns in FHV dataframe to answer remaining homework questions 

# Create a pickup date column
df_fhv = df_fhv.withColumn("pickup_date", F.to_date(F.col('pickup_datetime')))

# Convert pickup_datetime to timestamp and then seconds to perform date difference in hours
df_fhv = df_fhv.withColumn("pickup_datetime_seconds", F.to_timestamp(F.col("pickup_datetime")).cast(LongType()))
df_fhv = df_fhv.withColumn("dropoff_datetime_seconds", F.to_timestamp(F.col("dropoff_datetime")).cast(LongType()))
df_fhv = df_fhv.withColumn("trip_duration_hours", (F.col("dropoff_datetime_seconds") - F.col("pickup_datetime_seconds"))/3600)

# Create view for FHV data
df_fhv.createOrReplaceTempView('fhv')

## Question 3: Count Records
### How many trip were there on the 15th of October?
There were 62629 trips on October 15, 2019. 

In [9]:
# Count number of records for trips that started on October 15
spark.sql("""
SELECT 
    COUNT(*) AS num_trips_oct_15
FROM fhv
WHERE day(pickup_datetime) = 15 AND month(pickup_datetime) = 10 AND year(pickup_datetime) = 2019
""").show()

+----------------+
|num_trips_oct_15|
+----------------+
|           62629|
+----------------+



## Question 4: Longest trip for each day
### What is the length of the longest trip in the dataset in hours?

The longest trip in the dataset is 632,152.50 hours long.

In [10]:
# Longest trip in hours
spark.sql("""
SELECT 
    pickup_date,
    MAX(trip_duration_hours) AS daily_max_trip_duration_hours
FROM fhv
GROUP BY pickup_date
ORDER BY daily_max_trip_duration_hours DESC
""").show()

+-----------+-----------------------------+
|pickup_date|daily_max_trip_duration_hours|
+-----------+-----------------------------+
| 2019-10-28|                     631152.5|
| 2019-10-11|                     631152.5|
| 2019-10-31|            87672.44083333333|
| 2019-10-01|            70128.02805555555|
| 2019-10-17|                       8794.0|
| 2019-10-26|            8784.166666666666|
| 2019-10-30|           1465.5344444444445|
| 2019-10-25|           1057.8266666666666|
| 2019-10-02|            770.2313888888889|
| 2019-10-23|            746.6166666666667|
| 2019-10-03|                     746.3825|
| 2019-10-04|            745.6166666666667|
| 2019-10-07|            745.1666666666666|
| 2019-10-05|            698.1808333333333|
| 2019-10-06|            675.0077777777777|
| 2019-10-08|            626.0822222222222|
| 2019-10-16|            605.0666666666667|
| 2019-10-09|            602.3102777777777|
| 2019-10-10|            578.3888888888889|
| 2019-10-12|                   

## Question 5: User Interface
### What local port does Spark's User Interface run on?
Spark's User Interface runs on local port 4040 by default. 

## Question 6: Least frequent pickup location zone
### What is the name of the least frequent pickup zone?
Jamaica Bay is the name of the pickup zone with the least amount of trips. 

In [11]:
# Find least frequent pickup location zone 
spark.sql("""
SELECT 
    l.Zone,
    COUNT(*) AS num_trips
FROM fhv as f
LEFT JOIN taxi_lookup AS l 
    ON f.PULocationID = l.LocationID
GROUP BY l.Zone
ORDER BY num_trips ASC
""").show()

+--------------------+---------+
|                Zone|num_trips|
+--------------------+---------+
|         Jamaica Bay|        1|
|Governor's Island...|        2|
| Green-Wood Cemetery|        5|
|       Broad Channel|        8|
|     Highbridge Park|       14|
|        Battery Park|       15|
|Saint Michaels Ce...|       23|
|Breezy Point/Fort...|       25|
|Marine Park/Floyd...|       26|
|        Astoria Park|       29|
|    Inwood Hill Park|       39|
|       Willets Point|       47|
|Forest Park/Highl...|       53|
|  Brooklyn Navy Yard|       57|
|        Crotona Park|       62|
|        Country Club|       77|
|     Freshkills Park|       89|
|       Prospect Park|       98|
|     Columbia Street|      105|
|  South Williamsburg|      110|
+--------------------+---------+
only showing top 20 rows

