# Week 5 Homework

In this homework we'll put what we learned about Spark in practice.

For this homework we will be using the FHVHV 2021-06 data found here. [FHVHV Data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-06.csv.gz )

In [1]:
import warnings

warnings.filterwarnings("ignore")

## Question 1

**Install Spark and PySpark**

- Install Spark
- Run PySpark
- Create a local spark session
- Execute spark.version.

```bash
python download_datasets.py
Downloading: data/raw/fhvhv/2021/06/fhvhv_tripdata_2021-06.csv.gz
100% [......................................................................] 175799316 / 175799316
Downloaded the following files:
  data/raw/fhvhv/2021/06/fhvhv_tripdata_2021-06.csv.gz
```

In [2]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName('homework') \
    .getOrCreate()

spark.version

23/03/05 17:20:16 WARN Utils: Your hostname, ROG-STRIX resolves to a loopback address: 127.0.1.1; using 172.27.115.120 instead (on interface eth0)
23/03/05 17:20:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/05 17:20:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


'3.3.2'

What's the output?

* [X] **3.3.2**
* [ ] 2.1.4
* [ ] 1.2.3
* [ ] 5.4

## Question 2

**HVFHW June 2021**

Read it with Spark using the same schema as we did in the lessons.\
We will use this dataset for all the remaining questions.\
Repartition it to 12 partitions and save it to parquet.

In [3]:
import pandas as pd

df_fhvhv_pd = pd.read_csv(
    "data/raw/fhvhv/2021/06/fhvhv_tripdata_2021-06.csv.gz",
    parse_dates=["pickup_datetime", "dropoff_datetime"],
    nrows=1000)
df_fhvhv_pd.head()

Unnamed: 0,dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag,Affiliated_base_number
0,B02764,2021-06-01 00:02:41,2021-06-01 00:07:46,174,18,N,B02764
1,B02764,2021-06-01 00:16:16,2021-06-01 00:21:14,32,254,N,B02764
2,B02764,2021-06-01 00:27:01,2021-06-01 00:42:11,240,127,N,B02764
3,B02764,2021-06-01 00:46:08,2021-06-01 00:53:45,127,235,N,B02764
4,B02510,2021-06-01 00:45:42,2021-06-01 01:03:33,144,146,N,


In [4]:
df_fhvhv_pd.isnull().sum()

dispatching_base_num        0
pickup_datetime             0
dropoff_datetime            0
PULocationID                0
DOLocationID                0
SR_Flag                     0
Affiliated_base_number    265
dtype: int64

In [5]:
df_fhvhv_pd = df_fhvhv_pd[df_fhvhv_pd["Affiliated_base_number"].notna()]

In [6]:
spark.createDataFrame(df_fhvhv_pd).schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropoff_datetime', TimestampType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [7]:
from pyspark.sql import types

fhvhv_schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropoff_datetime', types.TimestampType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True), 
    types.StructField('Affiliated_base_number', types.StringType(), True),
])

input_path = f'data/raw/fhvhv/2021/06/'
output_path = f'data/pq/fhvhv/2021/06/'

df_fhvhv = spark.read \
    .option("header", "true") \
    .schema(fhvhv_schema) \
    .csv(input_path)

df_fhvhv \
    .repartition(12) \
    .write.parquet(output_path, mode='overwrite')

[Stage 2:>                                                        (0 + 12) / 12]

23/03/05 17:21:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/03/05 17:21:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/03/05 17:21:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
23/03/05 17:21:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
23/03/05 17:21:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
23/03/05 17:21:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
23/03/05 17:21:41 WARN MemoryManager: Total allocation exceeds 95.



In [8]:
!ls -lh data/pq/fhvhv/2021/06 | grep -v crc

total 285M
-rw-r--r-- 1 clamytoe clamytoe   0 Mar  5 17:21 _SUCCESS
-rw-r--r-- 1 clamytoe clamytoe 24M Mar  5 17:21 part-00000-7a61195f-6794-4aa8-b43a-e402a215ed0a-c000.snappy.parquet
-rw-r--r-- 1 clamytoe clamytoe 24M Mar  5 17:21 part-00001-7a61195f-6794-4aa8-b43a-e402a215ed0a-c000.snappy.parquet
-rw-r--r-- 1 clamytoe clamytoe 24M Mar  5 17:21 part-00002-7a61195f-6794-4aa8-b43a-e402a215ed0a-c000.snappy.parquet
-rw-r--r-- 1 clamytoe clamytoe 24M Mar  5 17:21 part-00003-7a61195f-6794-4aa8-b43a-e402a215ed0a-c000.snappy.parquet
-rw-r--r-- 1 clamytoe clamytoe 24M Mar  5 17:21 part-00004-7a61195f-6794-4aa8-b43a-e402a215ed0a-c000.snappy.parquet
-rw-r--r-- 1 clamytoe clamytoe 24M Mar  5 17:21 part-00005-7a61195f-6794-4aa8-b43a-e402a215ed0a-c000.snappy.parquet
-rw-r--r-- 1 clamytoe clamytoe 24M Mar  5 17:21 part-00006-7a61195f-6794-4aa8-b43a-e402a215ed0a-c000.snappy.parquet
-rw-r--r-- 1 clamytoe clamytoe 24M Mar  5 17:21 part-00007-7a61195f-6794-4aa8-b43a-e402a215ed0a-c000.snappy.par

What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)?\
Select the answer which most closely matches.

* [ ] 2MB
* [X] **24MB**
* [ ] 100MB
* [ ] 250MB

## Question 3

**Count records**  

How many taxi trips were there on June 15?\
Consider only trips that started on June 15.

In [9]:
df_fhvhv = spark.read.parquet('data/pq/fhvhv/2021/06')
df_fhvhv.registerTempTable('trips_data')

In [10]:
spark.sql("""
SELECT
    DAYOFMONTH(pickup_datetime) as day,
    COUNT(1) as trips
FROM
    trips_data
WHERE
    DAYOFMONTH(pickup_datetime) = 15
GROUP BY
    DAYOFMONTH(pickup_datetime)
""").show()

[Stage 4:>                                                        (0 + 12) / 12]

+---+------+
|day| trips|
+---+------+
| 15|452470|
+---+------+



                                                                                

* [ ] 308,164
* [ ] 12,856
* [X] **452,470**
* [ ] 50,982

## Question 4

**Longest trip for each day**  

Now calculate the duration for each trip.\
How long was the longest trip in Hours?

In [11]:
spark.sql("""
WITH tripdata AS (
    SELECT
        dispatching_base_num,
        pickup_datetime,
        dropoff_datetime,
        PULocationID,
        DOLocationID,
        SR_Flag,
        Affiliated_base_number,
        BIGINT(dropoff_datetime - pickup_datetime) as seconds
    FROM
        trips_data
),
trip_duration AS (
    SELECT
        (t.seconds/3600) AS hours,
        DENSE_RANK() OVER(ORDER BY t.seconds DESC) AS rank
    FROM
        tripdata t
)
SELECT
    td.hours
FROM
    trip_duration td
WHERE
    td.rank = 1
""").show()

23/03/05 17:22:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/05 17:22:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/05 17:22:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.




23/03/05 17:22:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/05 17:22:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


[Stage 9:>                                                          (0 + 1) / 1]

+----------------+
|           hours|
+----------------+
|66.8788888888889|
+----------------+



                                                                                

* [X] **66.87 Hours**
* [ ] 243.44 Hours
* [ ] 7.68 Hours
* [ ] 3.32 Hours

## Question 5

**User Interface**

 Spark’s User Interface which shows application's dashboard runs on which local port?
 
 ```bash
 spark-shell
23/03/05 17:29:36 WARN Utils: Your hostname, ROG-STRIX resolves to a loopback address: 127.0.1.1; using 172.27.115.120 instead (on interface eth0)
23/03/05 17:29:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/03/05 17:29:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://172.27.115.120:4040
Spark context available as 'sc' (master = local[*], app id = local-1678058983692).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.2
      /_/

Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.2)
Type in expressions to have them evaluated.
Type :help for more information.

scala>
```

* [ ] 80
* [ ] 443
* [X] **4040**
* [ ] 8080

## Question 6

**Most frequent pickup location zone**

Load the zone lookup data into a temp view in Spark\
[Zone Data](https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv)

Using the zone lookup data and the fhvhv June 2021 data, what is the name of the most frequent pickup location zone?

```bash
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
--2023-03-05 17:22:44--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv
Resolving github.com (github.com)... 140.82.114.3
Connecting to github.com (github.com)|140.82.114.3|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230305%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230305T232245Z&X-Amz-Expires=300&X-Amz-Signature=b888d6deaf6f1eca1ea6419c21d80ad6bff4032f60fbe50e89045fd95ed8e5c5&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream [following]
--2023-03-05 17:22:44--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/5a2cc2f5-b4cd-4584-9c62-a6ea97ed0e6a?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20230305%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20230305T232245Z&X-Amz-Expires=300&X-Amz-Signature=b888d6deaf6f1eca1ea6419c21d80ad6bff4032f60fbe50e89045fd95ed8e5c5&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dtaxi_zone_lookup.csv&response-content-type=application%2Foctet-stream
Resolving objects.githubusercontent.com (objects.githubusercontent.com)... 185.199.110.133, 185.199.111.133, 185.199.108.133, ...
Connecting to objects.githubusercontent.com (objects.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [application/octet-stream]
Saving to: ‘taxi_zone_lookup.csv’

taxi_zone_lookup.cs 100%[===================>]  12.03K  --.-KB/s    in 0s      

2023-03-05 17:22:45 (46.3 MB/s) - ‘taxi_zone_lookup.csv’ saved [12322/12322]
```

In [13]:
zones_df = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')
zones_df.write.parquet('zones', mode='overwrite')

In [14]:
!ls -lh zones/

total 8.0K
-rw-r--r-- 1 clamytoe clamytoe    0 Mar  5 17:23 _SUCCESS
-rw-r--r-- 1 clamytoe clamytoe 5.8K Mar  5 17:23 part-00000-d1160878-1834-41f1-90e2-fd53a683ca21-c000.snappy.parquet


In [15]:
df_zones = spark.read.parquet('zones/')
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [16]:
df_result = df_fhvhv.join(df_zones, df_fhvhv.PULocationID == df_zones.LocationID)
df_result.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+---------+--------------------+------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|LocationID|  Borough|                Zone|service_zone|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+---------+--------------------+------------+
|              B02867|2021-06-22 17:34:24|2021-06-22 17:55:47|         181|          33|      N|                B02867|       181| Brooklyn|          Park Slope|   Boro Zone|
|              B02510|2021-06-21 19:21:44|2021-06-21 20:29:35|         132|         265|      N|                  null|       132|   Queens|         JFK Airport|    Airports|
|              B02869|2021-06-26 18:06:05|2021-06-26 18:33:21|          63|          17|      N|                B02869|      

In [17]:
df_result \
    .drop('LocationID', 'PULocationID') \
    .write.parquet('tmp/revenue-zones', mode='overwrite')

23/03/05 17:23:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
23/03/05 17:23:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/03/05 17:23:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
23/03/05 17:23:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
23/03/05 17:23:59 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers


[Stage 17:>                                                       (0 + 12) / 12]

23/03/05 17:24:21 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
23/03/05 17:24:21 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers




23/03/05 17:24:21 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
23/03/05 17:24:21 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

In [18]:
df_result = spark.read.parquet('tmp/revenue-zones')
df_result.show()

+--------------------+-------------------+-------------------+------------+-------+----------------------+---------+--------------------+------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|DOLocationID|SR_Flag|Affiliated_base_number|  Borough|                Zone|service_zone|
+--------------------+-------------------+-------------------+------------+-------+----------------------+---------+--------------------+------------+
|              B02871|2021-06-27 16:44:17|2021-06-27 16:54:24|         119|      N|                B02871|    Bronx|University Height...|   Boro Zone|
|              B02878|2021-06-08 02:32:54|2021-06-08 02:38:50|         159|      N|                B02878|    Bronx|Mott Haven/Port M...|   Boro Zone|
|              B02835|2021-06-26 02:33:35|2021-06-26 02:39:42|         210|      N|                B02835| Brooklyn|           Homecrest|   Boro Zone|
|              B02878|2021-06-28 13:31:47|2021-06-28 14:13:17|         210|      N|           

In [19]:
df_result.registerTempTable('joined_data')

In [20]:
spark.sql("""
WITH zonedata AS (
    SELECT
        Zone,
        COUNT(1) as count,
        DENSE_RANK() OVER(ORDER BY COUNT(1) DESC) AS rank
    FROM
        joined_data
    GROUP BY
        Zone
)

SELECT
    z.Zone,
    z.count
FROM
    zonedata z
WHERE
    z.rank = 1
""").show()

23/03/05 17:25:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/05 17:25:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/05 17:25:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


[Stage 20:>                                                       (0 + 12) / 12][Stage 20:====>                                                   (1 + 11) / 12]

23/03/05 17:25:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/05 17:25:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/05 17:25:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/05 17:25:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+-------------------+------+
|               Zone| count|
+-------------------+------+
|Crown Heights North|231279|
+-------------------+------+



                                                                                

* [ ] East Chelsea
* [ ] Astoria
* [ ] Union Sq
* [X] **Crown Heights North**

## Submitting the solutions

- Form for submitting: <https://forms.gle/EcSvDs6vp64gcGuD8>
- You can submit your homework multiple times. In this case, only the last submission will be used.

Deadline: 06 March (Monday), 22:00 CET