In [21]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Check the spark version

In [25]:
spark.version

'3.3.2'

Read the fhv_tripdata_2019-10.csv file

In [26]:
from pyspark.sql import types

In [28]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True),
])

In [29]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv')

Repartition the dataframe

In [30]:
df = df.repartition(6)

Save the dataframe into parquet 

In [36]:
df.write.parquet('fhv_tripdata/2019/10/')

                                                                                

Calculate the average size of the parquet file 

In [40]:
import os

path = './fhv_tripdata/2019/10'
avg_size = 0
count = 0
for file in os.listdir(path):
    if file.endswith('.parquet'):
        avg_size+=int(os.path.getsize(os.path.join(path,file)))
        count+=1

print('Size in MB')
print((avg_size/1000000)/count)

Size in MB
6.209669833333333


In [41]:
df = spark.read.parquet(path)

In [42]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



Count the total of trips in October 15th

In [45]:
df.registerTempTable('fhv_2019_10')



In [50]:
spark.sql(
"""
SELECT COUNT(*) FROM fhv_2019_10
WHERE EXTRACT(MONTH FROM pickup_datetime)=10 AND EXTRACT(DAY FROM pickup_datetime)=15
"""
).show()



+--------+
|count(1)|
+--------+
|   62610|
+--------+



                                                                                

Calculate the longest trip in the dataset

In [53]:
from pyspark.sql import functions as F

In [61]:
df = df.withColumn('time',F.round((F.unix_timestamp('dropoff_datetime')-F.unix_timestamp('pickup_datetime'))/3600,2))

In [63]:
df.select(F.max('time')).show()

+---------+
|max(time)|
+---------+
| 631152.5|
+---------+



Load the zone file

In [65]:
zone_schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True),
])

In [70]:
zone_df = spark.read \
    .option("header", "true") \
    .schema(zone_schema) \
    .csv('taxi_zone_lookup.csv')

In [71]:
zone_df.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [76]:
df.select('*').show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|time|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----+
|              B02111|2019-10-01 10:18:42|2019-10-01 10:28:28|          92|          92|   null|                B02111|0.16|
|     B00021         |2019-10-01 23:08:47|2019-10-01 23:15:57|          56|          82|   null|       B00021         |0.12|
|              B03160|2019-10-02 08:29:00|2019-10-02 08:50:00|          47|         254|   null|                B00111|0.35|
|              B00692|2019-10-03 08:32:24|2019-10-03 09:01:26|         264|          22|   null|                B00692|0.48|
|              B01145|2019-10-01 01:07:53|2019-10-01 01:15:55|         264|         147|   null|                B01145|0.13|


In [None]:
df.withColumn('pickup_location')

In [74]:
zone_df.select('*').show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [78]:
zone_df = zone_df.withColumnRenamed('LocationID','PULocationID')

Join the fhv_tripdata_2019-10 with the taxi_zone_lookup

In [122]:
df_with_zones = df.join(zone_df,df.PULocationID==zone_df.PULocationID)

In [123]:
df_with_zones.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----+------------+-------+------------------+------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|time|PULocationID|Borough|              Zone|service_zone|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----+------------+-------+------------------+------------+
|              B02111|2019-10-01 10:18:42|2019-10-01 10:28:28|          92|          92|   null|                B02111|0.16|          92| Queens|          Flushing|   Boro Zone|
|     B00021         |2019-10-01 23:08:47|2019-10-01 23:15:57|          56|          82|   null|       B00021         |0.12|          56| Queens|            Corona|   Boro Zone|
|              B03160|2019-10-02 08:29:00|2019-10-02 08:50:00|          47|         254|   null|              

The least frequent pickup location Zone

In [130]:
grouped_df_by_zone = df_with_zones.groupBy('Zone').count().orderBy('count',ascending=True).limit(1).show()

+-----------+-----+
|       Zone|count|
+-----------+-----+
|Jamaica Bay|    1|
+-----------+-----+

