In [2]:
import pyspark
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('test') \
    .getOrCreate()

Question #1

In [5]:
spark.version

'3.3.2'

In [115]:
# !wget https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2019-10.parquet
# !wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

In [55]:
from pyspark.sql import types

schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropOff_datetime', types.TimestampType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

csv = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('./fhv_tripdata_2019-10.csv.gz')

In [67]:
csv.show()
csv.schema

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   null|                B00014|
|     B00021         |2019-10-01 00:00:4

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropOff_datetime', TimestampType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [60]:
csv \
    .repartition(6) \
    .write.parquet('data/', mode='overwrite')

Question #2

In [110]:
!ls ./data -al

total 39381
drwxr-xr-x 1 Ian Demavivas 197609       0 Feb 26 12:24 .
drwxr-xr-x 1 Ian Demavivas 197609       0 Feb 26 12:55 ..
-rw-r--r-- 1 Ian Demavivas 197609       8 Feb 26 12:24 ._SUCCESS.crc
-rw-r--r-- 1 Ian Demavivas 197609   52124 Feb 26 12:24 .part-00000-3961eda0-a086-4173-9822-aeb8e101182c-c000.snappy.parquet.crc
-rw-r--r-- 1 Ian Demavivas 197609   52040 Feb 26 12:24 .part-00001-3961eda0-a086-4173-9822-aeb8e101182c-c000.snappy.parquet.crc
-rw-r--r-- 1 Ian Demavivas 197609   52084 Feb 26 12:24 .part-00002-3961eda0-a086-4173-9822-aeb8e101182c-c000.snappy.parquet.crc
-rw-r--r-- 1 Ian Demavivas 197609   52008 Feb 26 12:24 .part-00003-3961eda0-a086-4173-9822-aeb8e101182c-c000.snappy.parquet.crc
-rw-r--r-- 1 Ian Demavivas 197609   52076 Feb 26 12:24 .part-00004-3961eda0-a086-4173-9822-aeb8e101182c-c000.snappy.parquet.crc
-rw-r--r-- 1 Ian Demavivas 197609   52100 Feb 26 12:24 .part-00005-3961eda0-a086-4173-9822-aeb8e101182c-c000.snappy.parquet.crc
-rw-r--r-- 1 Ian Demavivas 197609   

In [61]:
df = spark.read.parquet('data/*')

In [63]:
table = df.createOrReplaceTempView('table')

Question #3

In [65]:
Q3 = spark.sql("""
    SELECT COUNT(*) as no_taxi_trips
    FROM table
    WHERE pickup_datetime >= '2019-10-15 00:00:00' AND pickup_datetime < '2019-10-16 00:00:00'
""").show()

+-------------+
|no_taxi_trips|
+-------------+
|        62610|
+-------------+



Question #4

In [113]:
Q4 = spark.sql("""
SELECT 
    pickup_datetime,
    dropOff_datetime,
    TIMESTAMPDIFF(MINUTE, pickup_datetime, dropOff_datetime)/60 AS trip_duration
FROM table
ORDER BY trip_duration DESC
LIMIT 1
""").show()

+-------------------+-------------------+-------------+
|    pickup_datetime|   dropOff_datetime|trip_duration|
+-------------------+-------------------+-------------+
|2019-10-11 18:00:00|2091-10-11 18:30:00|     631152.5|
+-------------------+-------------------+-------------+



In [116]:
# !wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

In [112]:
zone_df = spark.read \
    .option("header", "true") \
    .csv('./taxi_zone_lookup.csv')

zone_df.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [71]:
df_join = df.join(zone_df, df.PULocationID == zone_df.LocationID, how='inner')

In [78]:
df_join.show()
#df_join.schema

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+---------+--------------------+------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|LocationID|  Borough|                Zone|service_zone|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+---------+--------------------+------------+
|              B02111|2019-10-27 05:35:21|2019-10-27 05:38:04|          92|          80|   null|                B02111|        92|   Queens|            Flushing|   Boro Zone|
|              B01437|2019-10-08 09:41:27|2019-10-08 09:46:52|         264|         197|   null|                B01437|       264|  Unknown|                  NV|         N/A|
|              B02107|2019-10-09 16:53:55|2019-10-09 17:06:05|         264|         167|   null|                B02107|      

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropOff_datetime', TimestampType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True), StructField('LocationID', StringType(), True), StructField('Borough', StringType(), True), StructField('Zone', StringType(), True), StructField('service_zone', StringType(), True)])

In [73]:
joined_table = df_join.createOrReplaceTempView('joined')

Question #6

In [114]:
Q6 = spark.sql("""
SELECT 
    COUNT(*) as no_of_pickups,
    PULocationID,
    Zone
FROM joined
WHERE Borough != 'Unknown'
GROUP BY 2, 3
ORDER BY 1 ASC
LIMIT 1
""").show()

+-------------+------------+-----------+
|no_of_pickups|PULocationID|       Zone|
+-------------+------------+-----------+
|            1|           2|Jamaica Bay|
+-------------+------------+-----------+

