Import relevant libraries

In [7]:
import pyspark
from pyspark.sql import SparkSession, types

Let's create a Spark session called 'homework'. A Spark session is a unified entry point of a spark application. It provides a way to interact with various spark's functionality.

In [2]:
spark = SparkSession.builder.master("local[*]").appName('homework').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/05 14:52:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Let's now download the dataset we'll be working with

In [3]:
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

--2022-03-05 14:53:31--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.216.89.11
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.216.89.11|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘fhvhv_tripdata_2021-02.csv’


2022-03-05 14:55:34 (5.69 MB/s) - ‘fhvhv_tripdata_2021-02.csv’ saved [733822658/733822658]



Next up, we'll read out data into a spark dataframe

In [4]:
df = spark.read.option("header", "true").csv("fhvhv_tripdata_2021-02.csv")

                                                                                

In [5]:
df.schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropoff_datetime,StringType,true),StructField(PULocationID,StringType,true),StructField(DOLocationID,StringType,true),StructField(SR_Flag,StringType,true)))

Let's now create a new schema, and re-create our dataframe, essentially changing the datatypes. Where we've written `True` - this means that the field can be nullable.

In [10]:
schema = types.StructType([
         types.StructField('hvfhs_license_num', types.StringType(), True),
         types.StructField('dispatching_base_num', types.StringType(), True),
         types.StructField('pickup_datetime', types.TimestampType(), True),
         types.StructField('dropoff_datetime', types.TimestampType(), True),
         types.StructField('PULocationID', types.IntegerType(), True),
         types.StructField('DOLocationID', types.IntegerType(), True),
         types.StructField('SR_Flag', types.StringType(), True)
])

In [11]:
df = spark.read.option("header", "true").schema(schema).csv("fhvhv_tripdata_2021-02.csv")

Now let's create a partitioned dataframe, with 24 partitions

In [12]:
df = df.repartition(24)

Now let's write these partitions out to parquet format

In [13]:
df.write.parquet('fhvhv/2021/02/')

22/03/05 15:07:51 WARN MemoryManager: Total allocation exceeds 95.00% (917,949,633 bytes) of heap memory
Scaling row group sizes to 97.70% for 7 writers
22/03/05 15:07:51 WARN MemoryManager: Total allocation exceeds 95.00% (917,949,633 bytes) of heap memory
Scaling row group sizes to 85.49% for 8 writers
22/03/05 15:07:51 WARN MemoryManager: Total allocation exceeds 95.00% (917,949,633 bytes) of heap memory
Scaling row group sizes to 75.99% for 9 writers
22/03/05 15:07:51 WARN MemoryManager: Total allocation exceeds 95.00% (917,949,633 bytes) of heap memory
Scaling row group sizes to 68.39% for 10 writers
22/03/05 15:07:56 WARN MemoryManager: Total allocation exceeds 95.00% (917,949,633 bytes) of heap memory
Scaling row group sizes to 75.99% for 9 writers
22/03/05 15:07:56 WARN MemoryManager: Total allocation exceeds 95.00% (917,949,633 bytes) of heap memory
Scaling row group sizes to 85.49% for 8 writers
22/03/05 15:07:56 WARN MemoryManager: Total allocation exceeds 95.00% (917,949,63

Now let's read these parquet files back into a spark dataframe

In [14]:
df = spark.read.parquet('fhvhv/2021/02/')

In [74]:
df.count()

11613942

In [16]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



What's the size of these folders?

In [18]:
!du -h

  0B	./.ipynb_checkpoints
218M	./fhvhv/2021/02
218M	./fhvhv/2021
218M	./fhvhv
923M	.


In [19]:
df.head()

Row(hvfhs_license_num='HV0005', dispatching_base_num='B02510', pickup_datetime=datetime.datetime(2021, 2, 2, 22, 6, 25), dropoff_datetime=datetime.datetime(2021, 2, 2, 22, 13, 17), PULocationID=126, DOLocationID=147, SR_Flag=None)

How many records are there for `2021-02-15`?

In [83]:
df.filter(df['pickup_datetime'].like('%2021-02-15%')).count()

                                                                                

367170

Let's register table as temporary table and run some SQL queries on it

In [84]:
df.registerTempTable('temp_table')



What day had the longest trip?

In [109]:
df_longest_trip = spark.sql("""
    SELECT pickup_datetime, (dropoff_datetime - pickup_datetime)
    FROM temp_table
    GROUP BY 1, 2
    ORDER BY 2 DESC
    LIMIT 1

""")

In [110]:
df_longest_trip.show()

[Stage 83:>                                                       (0 + 10) / 11]

+-------------------+------------------------------------+
|    pickup_datetime|(dropoff_datetime - pickup_datetime)|
+-------------------+------------------------------------+
|2021-02-11 13:40:44|                INTERVAL '0 20:59...|
+-------------------+------------------------------------+



                                                                                

What's the most frequent `dispatching_base_num`?

In [115]:
df_base_num = spark.sql("""
    SELECT dispatching_base_num, COUNT(*)
    FROM temp_table
    GROUP BY dispatching_base_num
""")

In [116]:
df_base_num.show()



+--------------------+--------+
|dispatching_base_num|count(1)|
+--------------------+--------+
|              B02876|  215693|
|              B03136|    1741|
|              B02877|  198938|
|              B02869|  429720|
|              B02883|  251617|
|              B02835|  189031|
|              B02884|  244963|
|              B02880|  115716|
|              B02878|  305185|
|              B02836|  128978|
|              B02872|  882689|
|              B02512|   41043|
|              B02867|  200530|
|              B02866|  311089|
|              B02871|  312364|
|              B02889|  138762|
|              B02844|    3502|
|              B02510| 3233664|
|              B02888|  169167|
|              B02682|  303255|
+--------------------+--------+
only showing top 20 rows



                                                                                

What's the most common pickup / dropoff pair?

In [167]:
df_location_pair = spark.sql("""
    SELECT CONCAT(PULocationID, ' / ', PULocationID), COUNT(*)
    FROM temp_table
    GROUP BY CONCAT(PULocationID, ' / ', PULocationID)
    ORDER BY 2 DESC
""")

In [168]:
df_location_pair.show(100, False)



+---------------------------------------+--------+
|concat(PULocationID,  / , PULocationID)|count(1)|
+---------------------------------------+--------+
|61 / 61                                |203777  |
|76 / 76                                |166959  |
|37 / 37                                |140636  |
|79 / 79                                |137901  |
|42 / 42                                |137246  |
|17 / 17                                |125394  |
|225 / 225                              |120026  |
|39 / 39                                |117751  |
|188 / 188                              |117734  |
|89 / 89                                |117092  |
|244 / 244                              |115590  |
|7 / 7                                  |110787  |
|231 / 231                              |109074  |
|74 / 74                                |103440  |
|181 / 181                              |103267  |
|35 / 35                                |103059  |
|234 / 234                     

                                                                                

We actually want to know the names of the locations. For this we'll need to download a new dataset and perform a join

In [169]:
! wget 'https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv'

--2022-03-05 16:51:20--  https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.217.166.48
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.217.166.48|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [application/octet-stream]
Saving to: ‘taxi+_zone_lookup.csv’


2022-03-05 16:51:21 (45.9 MB/s) - ‘taxi+_zone_lookup.csv’ saved [12322/12322]



In [170]:
df_lookup = spark.read.option("header", "true").csv("taxi+_zone_lookup.csv")

In [173]:
df_lookup.schema

StructType(List(StructField(LocationID,StringType,true),StructField(Borough,StringType,true),StructField(Zone,StringType,true),StructField(service_zone,StringType,true)))

In [174]:
lookup_schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True)
])

In [175]:
df_lookup = spark.read.option("header", "true").schema(lookup_schema).csv("taxi+_zone_lookup.csv")

In [176]:
df_lookup.write.parquet('fhvhv/lookup/')

                                                                                

In [177]:
df_lookup = spark.read.parquet('fhvhv/lookup/')

In [178]:
df_lookup.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [179]:
df_lookup.schema

StructType(List(StructField(LocationID,IntegerType,true),StructField(Borough,StringType,true),StructField(Zone,StringType,true),StructField(service_zone,StringType,true)))

In [183]:
df_lookup.registerTempTable('temp_table_lookup')



In [207]:
joined = spark.sql("""
    SELECT CONCAT(pu.Zone, ' / ', do.Zone), COUNT(*)
    FROM temp_table tt
    LEFT JOIN temp_table_lookup pu ON pu.LocationID = tt.PULocationID
    LEFT JOIN temp_table_lookup do ON do.LocationID = tt.DOLocationID
    GROUP BY CONCAT(pu.Zone, ' / ', do.Zone)
    ORDER BY 2 DESC
""")

In [208]:
joined.show(150, False)



+-----------------------------------------------------------+--------+
|concat(Zone,  / , Zone)                                    |count(1)|
+-----------------------------------------------------------+--------+
|East New York / East New York                              |45041   |
|Borough Park / Borough Park                                |37329   |
|Canarsie / Canarsie                                        |28026   |
|Crown Heights North / Crown Heights North                  |25976   |
|Bay Ridge / Bay Ridge                                      |17934   |
|Astoria / Astoria                                          |14688   |
|Jackson Heights / Jackson Heights                          |14688   |
|Central Harlem North / Central Harlem North                |14481   |
|Bushwick South / Bushwick South                            |14424   |
|Flatbush/Ditmas Park / Flatbush/Ditmas Park                |13976   |
|South Ozone Park / South Ozone Park                        |13716   |
|Brown

                                                                                