# Week 5 Homework

In this homework, I will put what I've learned about Spark into practice.

I'll use the High Volume For-Hire Vehicles (HVFHV) dataset from the New York Taxicab.

## Question 1. Install Spark and PySpark

* Install Spark
* Run PySpark
* Create a local Spark session.
* Execute `spark.version`

What's the output?

In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
            .master("local[*]") \
            .appName("HW5") \
            .getOrCreate()

22/03/12 10:06:29 WARN Utils: Your hostname, FONG-DEV-NUC resolves to a loopback address: 127.0.1.1; using 192.168.128.160 instead (on interface eth0)
22/03/12 10:06:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/03/12 10:06:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
spark.version

'3.0.3'

## Question 2. HVFHV February 2021

Download the HVFHV data for February 2021:

`curl -O https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv`

Read it with Spark using the same schema as we did in the lessons.  We'll use this dataset for all the remaining questions.

Repartition it to 24 partitions and save it as parquet format.

What is the size of the folder (in MB)?

In [3]:
!curl -O https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  699M  100  699M    0     0  11.6M      0  0:00:59  0:00:59 --:--:-- 29.6M1:08  0:00:55  0:00:13 14.2M


In [2]:
df = spark.read \
        .option("header", "true") \
        .csv("fhvhv_tripdata_2021-02.csv")

In [3]:
df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02764|2021-02-01 00:10:40|2021-02-01 00:21:09|          35|          39|   null|
|           HV0003|              B02764|2021-02-01 00:27:23|2021-02-01 00:44:01|          39|          35|   null|
|           HV0005|              B02510|2021-02-01 00:28:38|2021-02-01 00:38:27|          39|          91|   null|
|           HV0005|              B02510|2021-02-01 00:43:37|2021-02-01 01:23:20|          91|         228|   null|
|           HV0003|              B02872|2021-02-01 00:08:42|2021-02-01 00:17:57|         126|         250|   null|
|           HV0003|              B02872|2021-02-01 00:26:02|2021-02-01 00:42:51|

In [4]:
df.schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropoff_datetime,StringType,true),StructField(PULocationID,StringType,true),StructField(DOLocationID,StringType,true),StructField(SR_Flag,StringType,true)))

In [5]:
from pyspark.sql import types

In [6]:
schema = types.StructType([
    types.StructField("hvfhs_license_num" ,types.StringType(), True),
    types.StructField("dispatching_base_num" ,types.StringType(), True),
    types.StructField("pickup_datetime" ,types.TimestampType(), True),
    types.StructField("dropoff_datetime" ,types.TimestampType(), True),
    types.StructField("PULocationID" ,types.IntegerType(), True),
    types.StructField("DOLocationID" ,types.IntegerType(), True),
    types.StructField("SR_Flag" ,types.StringType(), True)
])

In [7]:
df = spark.read \
        .option("header", "true") \
        .schema(schema) \
        .csv("fhvhv_tripdata_2021-02.csv")

In [8]:
df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02764|2021-02-01 00:10:40|2021-02-01 00:21:09|          35|          39|   null|
|           HV0003|              B02764|2021-02-01 00:27:23|2021-02-01 00:44:01|          39|          35|   null|
|           HV0005|              B02510|2021-02-01 00:28:38|2021-02-01 00:38:27|          39|          91|   null|
|           HV0005|              B02510|2021-02-01 00:43:37|2021-02-01 01:23:20|          91|         228|   null|
|           HV0003|              B02872|2021-02-01 00:08:42|2021-02-01 00:17:57|         126|         250|   null|
|           HV0003|              B02872|2021-02-01 00:26:02|2021-02-01 00:42:51|

In [9]:
df = df.repartition(24)

In [10]:
df.write.parquet("fhvhv/2021/02/", mode="overwrite")

22/03/12 10:18:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
22/03/12 10:18:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
22/03/12 10:18:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
22/03/12 10:18:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
22/03/12 10:18:22 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
22/03/12 10:18:25 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
22/03/12 10:18:25 WARN MemoryManager: Total allocation exceeds 95.

In [11]:
!ls -lh fhvhv/2021/02/

total 203M
-rwxrwxrwx 1 fongt fongt    0 Mar 12 10:18 _SUCCESS
-rwxrwxrwx 1 fongt fongt 8.5M Mar 12 10:18 part-00000-f6ba5685-7491-47d8-82d0-209967cbbef3-c000.snappy.parquet
-rwxrwxrwx 1 fongt fongt 8.5M Mar 12 10:18 part-00001-f6ba5685-7491-47d8-82d0-209967cbbef3-c000.snappy.parquet
-rwxrwxrwx 1 fongt fongt 8.5M Mar 12 10:18 part-00002-f6ba5685-7491-47d8-82d0-209967cbbef3-c000.snappy.parquet
-rwxrwxrwx 1 fongt fongt 8.5M Mar 12 10:18 part-00003-f6ba5685-7491-47d8-82d0-209967cbbef3-c000.snappy.parquet
-rwxrwxrwx 1 fongt fongt 8.5M Mar 12 10:18 part-00004-f6ba5685-7491-47d8-82d0-209967cbbef3-c000.snappy.parquet
-rwxrwxrwx 1 fongt fongt 8.5M Mar 12 10:18 part-00005-f6ba5685-7491-47d8-82d0-209967cbbef3-c000.snappy.parquet
-rwxrwxrwx 1 fongt fongt 8.5M Mar 12 10:18 part-00006-f6ba5685-7491-47d8-82d0-209967cbbef3-c000.snappy.parquet
-rwxrwxrwx 1 fongt fongt 8.5M Mar 12 10:18 part-00007-f6ba5685-7491-47d8-82d0-209967cbbef3-c000.snappy.parquet
-rwxrwxrwx 1 fongt fongt 8.5M Mar 12 10:18 part-0

The size of the folder "fhvhv/2021/02/" is 203 MB.

## Question 3. Counting Records

How many taxi trips were there on February 15th?

Consider only trips that started on February 15th.

In [12]:
df.registerTempTable("fhvhv")

In [15]:
spark.sql("""
    SELECT
        COUNT(1) AS Feb15
    FROM
        fhvhv
    WHERE
        MONTH(pickup_datetime) = 2 AND DAYOFMONTH(pickup_datetime) = 15;
""").show()



+------+
| Feb15|
+------+
|367170|
+------+



                                                                                

There were 367,170 taxi trips that started on February 15, 2021.

## Question 4. Longest Trip for Each Day

Now calculate the duration for each trip.

Trip starting on which day was the longest?

In [40]:
spark.sql("""
    SELECT
        TO_DATE(pickup_datetime) AS pickup_date,
        UNIX_TIMESTAMP(dropoff_datetime) - UNIX_TIMESTAMP(pickup_datetime) AS sec_diff
    FROM
        fhvhv
    ORDER BY 
        sec_diff DESC
    LIMIT
        5;
""").show()



+-----------+--------+
|pickup_date|sec_diff|
+-----------+--------+
| 2021-02-11|   75540|
| 2021-02-17|   57221|
| 2021-02-20|   44039|
| 2021-02-03|   40653|
| 2021-02-19|   37577|
+-----------+--------+



                                                                                

It appears the longest trip occurred on February 11, 2021.

## Question 5. Most Frequent `dispatching_base_num`

Now find the most frequently occuring `dispatching_base_num` in this dataset.

How many stages does this Spark job have?

> **_NOTE:_** The answer may depend on how you write the query, so there are multiple correct answers.  Select the one you have.

In [17]:
spark.sql("""
    SELECT
        COUNT(1) AS num_disp_base_num,
        dispatching_base_num
    FROM 
        fhvhv
    GROUP BY
        dispatching_base_num
    ORDER BY
        num_disp_base_num DESC
    LIMIT
        5;
""").show()



+-----------------+--------------------+
|num_disp_base_num|dispatching_base_num|
+-----------------+--------------------+
|          3233664|              B02510|
|           965568|              B02764|
|           882689|              B02872|
|           685390|              B02875|
|           559768|              B02765|
+-----------------+--------------------+



                                                                                

The most common `dispatching_base_num` is "B02510".

The Spark job had 4 stages:

![Question 5 Spark Job Pic](./images/hw-q5-jobs.png "Question 5 Spark Job")

## Question 6. Most Common Locations Pair

Find the most common pickup-dropoff pair.

For example:
"Jamaica Bay / Clinton East"

Enter two zone names separated by a slash.

If any of the zone names are unknown (missing), use "Unknown."  For example, "Unknown / Clinton East."

In [20]:
!curl -O https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 12322  100 12322    0     0    797      0  0:00:15  0:00:15 --:--:--  2968


In [21]:
df_zones = spark.read \
            .option("header", "true") \
            .csv("taxi+_zone_lookup.csv")

In [23]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [24]:
df_zones.schema

StructType(List(StructField(LocationID,StringType,true),StructField(Borough,StringType,true),StructField(Zone,StringType,true),StructField(service_zone,StringType,true)))

In [25]:
zone_schema = types.StructType([
    types.StructField("LocationID", types.IntegerType(), True),
    types.StructField("Borough", types.StringType(), True),
    types.StructField("Zone", types.StringType(), True),
    types.StructField("service_zone", types.StringType(), True)
])

In [26]:
df_zones = spark.read \
            .option("header", "true") \
            .schema(zone_schema) \
            .csv("taxi+_zone_lookup.csv")

In [29]:
df_zones.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [30]:
df_zones.registerTempTable("zones")

In [38]:
spark.sql("""
    SELECT
        CONCAT(COALESCE(puzones.zone, 'Unknown'), '/', COALESCE(dozones.zone, 'Unknown')) AS PUDOpair,
        COUNT(1) AS num_occurs
    FROM
        fhvhv
            LEFT JOIN zones AS puzones
                ON fhvhv.PULocationID = puzones.LocationID
            LEFT JOIN zones AS dozones
                ON fhvhv.DOLocationID = dozones.LocationID
    GROUP BY
        PUDOpair
    ORDER BY
        num_occurs DESC
    LIMIT
        5;
""").show(5, False)



+---------------------------------------+----------+
|PUDOpair                               |num_occurs|
+---------------------------------------+----------+
|East New York/East New York            |45041     |
|Borough Park/Borough Park              |37329     |
|Canarsie/Canarsie                      |28026     |
|Crown Heights North/Crown Heights North|25976     |
|Bay Ridge/Bay Ridge                    |17934     |
+---------------------------------------+----------+



                                                                                

The most common locations pair is East New York/East New York.

## Bonus Question.  Join Type

For finding the answer to question 6, you'll need to perform a join.

What type of join is it?

How many stages does your Spark job have?

The type of join is a broadcast join:

![Question 6 Join Type](./images/hw-bonus-join-type.png "Question 6 Join Type")

My Spark job has 4 stages:

![Question 6 Spark Job Pic](./images/hw-q6-jobs.png "Question 6 Spark Job")