In [65]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
import pyspark.sql.functions as F

In [66]:
spark.version

'3.3.1'

In [None]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [31]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('hvfhs_license_num', types.StringType(), True)
])

In [None]:
!pwd

In [29]:
df = spark.read \
    .option("header", "true") \
    .csv("fhvhv_tripdata_2021-06.csv")

In [33]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+-----------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|hvfhs_license_num|
+--------------------+-------------------+-------------------+------------+------------+-------+-----------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|           B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|           B02764|
|              B02764|2021-06-01 00:27:01|2021-06-01 00:42:11|         240|         127|      N|           B02764|
|              B02764|2021-06-01 00:46:08|2021-06-01 00:53:45|         127|         235|      N|           B02764|
|              B02510|2021-06-01 00:45:42|2021-06-01 01:03:33|         144|         146|      N|             null|
|              B02510|2021-06-01 00:18:15|2021-06-01 00:25:47|          49|     

In [34]:
# Question 2: Repartition the June 2021 HVFHV Data into 12 partitions and save it to Parquet. 
df = df.repartition(12)


In [36]:
df.write.parquet('fhv_parq/', mode ="overwrite")

In [None]:
# What is the average size of the Parquet Files?
# 24MB

In [38]:
# Question 3: How many taxi trips were started on June 15th?
df.filter(F.to_date(F.col("pickup_datetime"))=='2021-06-15').count()

452470

In [47]:
# Question 4: How long is the longest trip in the dataset? 
df.withColumn("trip_time_hour", \
              (F.col("dropoff_datetime").cast(types.LongType()) - F.col("pickup_datetime").cast(types.LongType()))/3600) \
.orderBy(F.desc("trip_time_hour")) \
.show(5,False)

+--------------------+-------------------+-------------------+------------+------------+-------+-----------------+------------------+
|dispatching_base_num|pickup_datetime    |dropoff_datetime   |PULocationID|DOLocationID|SR_Flag|hvfhs_license_num|trip_time_hour    |
+--------------------+-------------------+-------------------+------------+------------+-------+-----------------+------------------+
|B02872              |2021-06-25 13:55:41|2021-06-28 08:48:25|98          |265         |N      |B02872           |66.8788888888889  |
|B02765              |2021-06-22 12:09:45|2021-06-23 13:42:44|188         |198         |N      |B02765           |25.549722222222222|
|B02879              |2021-06-27 10:32:29|2021-06-28 06:31:20|78          |169         |N      |B02879           |19.980833333333333|
|B02800              |2021-06-26 22:37:11|2021-06-27 16:49:01|263         |36          |N      |null             |18.197222222222223|
|B02682              |2021-06-23 20:40:43|2021-06-24 13:08:44|

In [None]:
+--------------------+-------------------+-------------------+------------+------------+-------+-----------------+------------------+
|dispatching_base_num|pickup_datetime    |dropoff_datetime   |PULocationID|DOLocationID|SR_Flag|hvfhs_license_num|trip_time_hour    |
+--------------------+-------------------+-------------------+------------+------------+-------+-----------------+------------------+
|B02872              |2021-06-25 13:55:41|2021-06-28 08:48:25|98          |265         |N      |B02872           |66.8788888888889  |
|B02765              |2021-06-22 12:09:45|2021-06-23 13:42:44|188         |198         |N      |B02765           |25.549722222222222|
|B02879              |2021-06-27 10:32:29|2021-06-28 06:31:20|78          |169         |N      |B02879           |19.980833333333333|
|B02800              |2021-06-26 22:37:11|2021-06-27 16:49:01|263         |36          |N      |null             |18.197222222222223|
|B02682              |2021-06-23 20:40:43|2021-06-24 13:08:44|3           |247         |N      |B02682           |16.466944444444444|
+--------------------+-------------------+-------------------+------------+------------+-------+-----------------+------------------+

In [49]:
# Question 6: What is the name of the most frequent pickup location zone?
df_zones = spark.read \
    .option("header", "true") \
    .csv("taxi_zone_lookup.csv")

In [50]:
# df_zones.registerTempTable('zones')




In [58]:
df = df.join(F.broadcast(df_zones), df.PULocationID == df_zones.LocationID, "left")

In [None]:
+--------------------+-------------------+-------------------+------------+------------+-------+-----------------+----------+---------+--------------------+------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|hvfhs_license_num|LocationID|  Borough|                Zone|service_zone|
+--------------------+-------------------+-------------------+------------+------------+-------+-----------------+----------+---------+--------------------+------------+
|              B02871|2021-06-20 13:22:11|2021-06-20 15:19:40|         132|         265|      N|           B02871|       132|   Queens|         JFK Airport|    Airports|
|              B02872|2021-06-19 20:18:20|2021-06-19 20:50:56|          26|          76|      N|           B02872|        26| Brooklyn|        Borough Park|   Boro Zone|
|              B02765|2021-06-19 22:48:22|2021-06-19 23:13:12|         132|         155|      N|           B02765|       132|   Queens|         JFK Airport|    Airports|
|              B02876|2021-06-20 09:44:13|2021-06-20 09:58:18|         145|           7|      N|           B02876|       145|   Queens|Long Island City/...|   Boro Zone|
|              B02510|2021-06-19 19:23:07|2021-06-19 19:34:46|          80|         198|      N|             null|        80| Brooklyn|   East Williamsburg|   Boro Zone|
|              B02877|2021-06-19 21:13:53|2021-06-19 21:21:32|          61|          72|      N|           B02877|        61| Brooklyn| Crown Heights North|   Boro Zone|
|              B02510|2021-06-20 06:33:01|2021-06-20 06:52:20|         230|         138|      N|             null|       230|Manhattan|Times Sq/Theatre ...| Yellow Zone|
|              B02884|2021-06-20 02:27:43|2021-06-20 02:36:12|         165|          22|      N|           B02884|       165| Brooklyn|             Midwood|   Boro Zone|
|              B02510|2021-06-19 23:04:58|2021-06-19 23:28:30|          68|         232|      N|             null|        68|Manhattan|        East Chelsea| Yellow Zone|
|              B02872|2021-06-20 10:04:23|2021-06-20 10:16:06|         186|          13|      N|           B02872|       186|Manhattan|Penn Station/Madi...| Yellow Zone|
|              B02510|2021-06-20 08:12:15|2021-06-20 08:20:41|         197|         134|      N|             null|       197|   Queens|       Richmond Hill|   Boro Zone|
|              B02872|2021-06-20 00:35:43|2021-06-20 00:56:47|         246|         265|      N|           B02872|       246|Manhattan|West Chelsea/Huds...| Yellow Zone|
|              B02883|2021-06-20 12:54:13|2021-06-20 13:02:54|         150|         108|      N|           B02883|       150| Brooklyn|     Manhattan Beach|   Boro Zone|
|              B02510|2021-06-19 19:10:20|2021-06-19 19:27:58|          68|          79|      N|             null|        68|Manhattan|        East Chelsea| Yellow Zone|
|              B02878|2021-06-19 20:22:39|2021-06-19 20:52:02|          41|         230|      N|           B02878|        41|Manhattan|      Central Harlem|   Boro Zone|
|              B02864|2021-06-20 03:54:15|2021-06-20 04:18:12|         137|         265|      N|           B02864|       137|Manhattan|            Kips Bay| Yellow Zone|
|              B02878|2021-06-19 19:20:00|2021-06-19 19:27:33|         100|          48|      N|           B02878|       100|Manhattan|    Garment District| Yellow Zone|
|              B02510|2021-06-20 00:05:07|2021-06-20 00:16:03|         138|         260|      N|             null|       138|   Queens|   LaGuardia Airport|    Airports|
|              B02510|2021-06-20 00:58:40|2021-06-20 01:14:00|         124|          91|      N|             null|       124|   Queens|        Howard Beach|   Boro Zone|
|              B02872|2021-06-19 22:07:42|2021-06-19 22:32:54|         237|         148|      N|           B02872|       237|Manhattan|Upper East Side S...| Yellow Zone|
+--------------------+-------------------+-------------------+------------+------------+-------+-----------------+----------+---------+--------------------+------------+

In [63]:
df.groupBy("Zone").count().orderBy(F.desc("count")).limit(1).show()

+-------------------+------+
|               Zone| count|
+-------------------+------+
|Crown Heights North|231279|
+-------------------+------+



In [None]:
+-------------------+------+
|               Zone| count|
+-------------------+------+
|Crown Heights North|231279|
+-------------------+------+