In [1]:
import findspark
findspark.init()

import pyspark

### Question 1 
Execute spark.version What's the output? Spark version used

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()
spark.version

'3.0.3'

### Question 2
We will use this dataset for all the remaining questions.
Repartition it to 12 partitions and save it to parquet.
What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.

In [None]:
from pyspark.sql import types

schema = types.StructType([
    types.StructField("dispatching_base_num", types.StringType(), True),
    types.StructField("pickup_datetime", types.TimestampType(), True),
    types.StructField("dropoff_datetime", types.TimestampType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("SR_Flag", types.StringType(), True),
    types.StructField("Affiliated_base_number", types.StringType(), True),
])

In [7]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('data/fhvhv_tripdata_2021-06.csv.gz')
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|
|              B02510|2021-06-01 00:45:42|2021-06-01 01:03:33|         144|         146|      N|                  null|
|              B02510|2021-06-01 00:18:1

In [8]:
df.schema

StructType(List(StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,TimestampType,true),StructField(dropoff_datetime,TimestampType,true),StructField(PULocationID,IntegerType,true),StructField(DOLocationID,IntegerType,true),StructField(SR_Flag,StringType,true),StructField(Affiliated_base_number,StringType,true)))

In [None]:
df \
    .repartition(12) \
    .write.parquet("data/pq/fhvhv/2021/06")

__Result__: 24 MB

### Question 3
How many taxi trips were there on June 15? Consider only trips that started on June 15.

In [20]:
df.registerTempTable('fhvhv')

In [12]:
df_count = spark.sql("""
SELECT 
    date_trunc('date', pickup_datetime) AS date, 
    COUNT(1) AS number_records
FROM
    fhvhv
WHERE
    pickup_datetime >= '2021-06-15 00:00:00'
    and pickup_datetime < '2021-06-16 00:00:00'
GROUP BY
    1
ORDER BY
    1
""")
df_count.show()

+----+--------------+
|date|number_records|
+----+--------------+
|null|        452470|
+----+--------------+



### Question 4
Now calculate the duration for each trip.
How long was the longest trip in Hours?

In [18]:
from pyspark.sql.functions import *
df = df.withColumn(
    'Travel_Hours',
    ( unix_timestamp(df['dropoff_datetime']) - unix_timestamp(df['pickup_datetime']) )/ 3600
)
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|       Travel_Hours|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|0.08472222222222223|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|0.08277777777777778|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|                B02764|0.25277777777777777|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|                B02764|0.12694444444444444|
|              B0251

In [22]:
df_max_travel_hours = df.agg({"Travel_Hours": "max"}).collect()[0][0]
df_max_travel_hours

66.8788888888889

### Question 6
Using the zone lookup data and the fhvhv June 2021 data, what is the name of the most frequent pickup location zone?

In [23]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [26]:
df.count()

14961892

In [29]:
df_PU_locations = df.join(df_zones, df.PULocationID == df_zones.LocationID).select("Borough", "Zone", "service_zone")
df_PU_locations.show()

+---------+--------------------+------------+
|  Borough|                Zone|service_zone|
+---------+--------------------+------------+
|    Bronx|             Norwood|   Boro Zone|
|    Bronx|           Bronxdale|   Boro Zone|
|    Bronx|  Van Cortlandt Park|   Boro Zone|
|Manhattan|              Inwood|   Boro Zone|
|Manhattan| Little Italy/NoLiTa| Yellow Zone|
| Brooklyn|        Clinton Hill|   Boro Zone|
| Brooklyn|        Clinton Hill|   Boro Zone|
| Brooklyn|  Stuyvesant Heights|   Boro Zone|
|Manhattan|             Seaport| Yellow Zone|
| Brooklyn|   East Williamsburg|   Boro Zone|
| Brooklyn|  South Williamsburg|   Boro Zone|
| Brooklyn|             Bedford|   Boro Zone|
| Brooklyn|      Sheepshead Bay|   Boro Zone|
|   Queens|           Sunnyside|   Boro Zone|
|   Queens|      Queens Village|   Boro Zone|
|   Queens|             Bayside|   Boro Zone|
|    Bronx|         Parkchester|   Boro Zone|
| Brooklyn|Prospect-Lefferts...|   Boro Zone|
| Brooklyn|            Canarsie|  

In [30]:
df_PU_locations.registerTempTable('pu_locations')

In [33]:
df_PU_frequency = spark.sql("""
SELECT 
    Zone,
    COUNT(1) AS number_records
FROM
    pu_locations
GROUP BY
    1
ORDER BY
    2 DESC
""")
df_PU_frequency.show()

+--------------------+--------------+
|                Zone|number_records|
+--------------------+--------------+
| Crown Heights North|        231279|
|        East Village|        221244|
|         JFK Airport|        188867|
|      Bushwick South|        187929|
|       East New York|        186780|
|TriBeCa/Civic Center|        164344|
|   LaGuardia Airport|        161596|
|            Union Sq|        158937|
|        West Village|        154698|
|             Astoria|        152493|
|     Lower East Side|        151020|
|        East Chelsea|        147673|
|Central Harlem North|        146402|
|Williamsburg (Nor...|        143683|
|          Park Slope|        143594|
|  Stuyvesant Heights|        141427|
|        Clinton East|        139611|
|West Chelsea/Huds...|        139431|
|             Bedford|        138428|
|         Murray Hill|        137879|
+--------------------+--------------+
only showing top 20 rows

