## Week 5 Homework

In this homework we'll put what we learned about Spark in practice.

We'll use high volume for-hire vehicles (HVFHV) dataset for that.

In [1]:
import pandas as pd
import pyspark
from pyspark.sql import types, SparkSession, functions as F

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

### Question 1: Version of PySpark

* Install Spark
* Run PySpark
* Create a local spark session
* Execute `spark.version`

What's the output?

In [3]:
spark.version

'3.0.3'

### Question 2: Size of HVFHW February 2021

Download the HVFHV data for february 2021:
```bash
wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
```
Read it with Spark using the same schema as we did in the lessons. We will use this dataset for all the remaining questions.

Repartition it to 24 partitions and save it to parquet.

What's the size of the folder with results (in MB)?

In [None]:
!wget --directory-prefix=data/raw/fhvhv https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

```bash
!wc -l data/raw/fhvhv/fhvhv_tripdata_2021-02.csv
11613943 data/raw/fhvhv/fhvhv_tripdata_2021-02.csv
```

In [12]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [13]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('data/raw/fhvhv/fhvhv_tripdata_2021-02.csv')

In [15]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [18]:
df.repartition(24) \
    .write.parquet('data/pq/fhvhv/2021/02/')

```bash
ls -lh data/pq/fhvhv/2021/02/
total 206M
```

### Question 3: Records on Feb 2021 

How many taxi trips were there on February 15?

Consider only trips that started on February 15.

In [4]:
df_fhvhv = spark.read.parquet('data/pq/fhvhv/*/*')

In [5]:
df_fhvhv.columns

['hvfhs_license_num',
 'dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag']

In [6]:
df_fhvhv = df_fhvhv \
    .withColumn('pickup_date', F.to_date('pickup_datetime'))

In [7]:
df_fhvhv.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- pickup_date: date (nullable = true)



In [8]:
df_fhvhv.show(10)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|pickup_date|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+
|           HV0003|              B02765|2021-02-03 19:55:14|2021-02-03 20:05:58|         143|         164|   null| 2021-02-03|
|           HV0003|              B02764|2021-02-04 12:57:01|2021-02-04 12:59:18|         205|         205|   null| 2021-02-04|
|           HV0003|              B02880|2021-02-04 20:27:39|2021-02-04 20:37:28|         258|         134|   null| 2021-02-04|
|           HV0003|              B02887|2021-02-05 06:21:51|2021-02-05 06:38:33|          57|         242|   null| 2021-02-05|
|           HV0003|              B02865|2021-02-01 07:17:28|2021-02-01 07:25:20|          76|          35|   nu

In [9]:
df_fhvhv.registerTempTable('trips_data')

In [10]:
spark.sql("""
SELECT 
    COUNT(*) AS trips
FROM 
    trips_data
WHERE 
    pickup_date == '2021-02-15'
""").show()

+------+
| trips|
+------+
|367170|
+------+



### Question 4: Day with the longest trip

Now calculate the duration for each trip.

Trip starting on which day was the longest?

In [11]:
spark.sql("""
SELECT 
    pickup_date,
    pickup_datetime,
    dropoff_datetime,
    (dropoff_datetime - pickup_datetime) AS trip_duration,
    (BIGINT(dropoff_datetime) - BIGINT(pickup_datetime)) AS trip_duration_int
FROM 
    trips_data
ORDER BY 
    trip_duration_int DESC
""").show(10)

+-----------+-------------------+-------------------+--------------------+-----------------+
|pickup_date|    pickup_datetime|   dropoff_datetime|       trip_duration|trip_duration_int|
+-----------+-------------------+-------------------+--------------------+-----------------+
| 2021-02-11|2021-02-11 13:40:44|2021-02-12 10:39:44| 20 hours 59 minutes|            75540|
| 2021-02-17|2021-02-17 15:54:53|2021-02-18 07:48:34|15 hours 53 minut...|            57221|
| 2021-02-20|2021-02-20 12:08:15|2021-02-21 00:22:14|12 hours 13 minut...|            44039|
| 2021-02-03|2021-02-03 20:24:25|2021-02-04 07:41:58|11 hours 17 minut...|            40653|
| 2021-02-19|2021-02-19 23:17:44|2021-02-20 09:44:01|10 hours 26 minut...|            37577|
| 2021-02-25|2021-02-25 17:13:35|2021-02-26 02:57:05|9 hours 43 minute...|            35010|
| 2021-02-20|2021-02-20 01:36:13|2021-02-20 11:16:19|9 hours 40 minute...|            34806|
| 2021-02-18|2021-02-18 15:24:19|2021-02-19 01:01:11|9 hours 36 minute

### Question 5: Stages for most frequent `dispatching_base_num`

Now find the most frequently occurring `dispatching_base_num` in this dataset.

How many stages this spark job has?

> Note: the answer may depend on how you write the query, so there are multiple correct answers. Select the one you have.

In [14]:
spark.sql("""
SELECT 
    dispatching_base_num,
    COUNT(*) AS trips
FROM 
    trips_data
GROUP BY 
    dispatching_base_num
ORDER BY 
    trips DESC
""").show(10)

+--------------------+-------+
|dispatching_base_num|  trips|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
|              B02869| 429720|
|              B02887| 322331|
|              B02871| 312364|
|              B02864| 311603|
|              B02866| 311089|
+--------------------+-------+
only showing top 10 rows



>Answer: 2 stages.

### Question 6: Most common locations pair

Find the most common pickup-dropoff pair.

For example:

`"Jamaica Bay / Clinton East"`

Enter two zone names separated by a slash

If any of the zone names are unknown (missing), use `"Unknown"`. For example, `"Unknown / Clinton East"`.

In [15]:
!wget --directory-prefix=data/raw/zones https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

--2022-02-28 13:33:53--  https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.95.173
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.95.173|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [application/octet-stream]
Saving to: 'data/raw/zones/taxi+_zone_lookup.csv'

     0K .......... ..                                         100% 2,58M=0,005s

2022-02-28 13:33:53 (2,58 MB/s) - 'data/raw/zones/taxi+_zone_lookup.csv' saved [12322/12322]



In [19]:
zones_schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True),
])

In [20]:
df_zones = spark.read \
    .option("header", "true") \
    .schema(zones_schema) \
    .csv('data/raw/zones/taxi+_zone_lookup.csv')

In [21]:
df_zones.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [35]:
df_zones.show(10)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 10 rows



In [57]:
df_zpu = df_zones \
    .withColumnRenamed('LocationID', 'zpuLocationID') \
    .withColumnRenamed('Zone', 'zpuZone') \
    .drop('Borough', 'service_zone')

df_zdo = df_zones \
    .withColumnRenamed('LocationID', 'zdoLocationID') \
    .withColumnRenamed('Zone', 'zdoZone') \
    .drop('Borough', 'service_zone')

In [66]:
df_join = df_fhvhv \
    .join(df_zpu, df_fhvhv.PULocationID == df_zpu.zpuLocationID) \
    .join(df_zdo, df_fhvhv.DOLocationID == df_zdo.zdoLocationID)

In [68]:
df_join.show(10)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+-------------+--------------------+-------------+--------------------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|pickup_date|zpuLocationID|             zpuZone|zdoLocationID|             zdoZone|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+-----------+-------------+--------------------+-------------+--------------------+
|           HV0003|              B02765|2021-02-03 19:55:14|2021-02-03 20:05:58|         143|         164|   null| 2021-02-03|          143| Lincoln Square West|          164|       Midtown South|
|           HV0003|              B02764|2021-02-04 12:57:01|2021-02-04 12:59:18|         205|         205|   null| 2021-02-04|          205|        Saint Albans|          205|        Saint Albans|
|           HV0

In [70]:
df_join \
    .drop('SR_Flag', 'pickup_date', 'zpuLocationID', 'zdoLocationID') \
    .write.parquet('data/pq/locations-pairs')

In [71]:
df_join.registerTempTable('locations_table')

In [82]:
spark.sql("""
SELECT 
    CONCAT(COALESCE(zpuZone, 'Unknown'), '/', COALESCE(zdoZone, 'Unknown')) AS locations,
    COUNT(1) AS trips
FROM 
    locations_table
GROUP BY 
    locations
ORDER BY 
    trips DESC
""").show(10)

+--------------------+-----+
|           locations|trips|
+--------------------+-----+
|East New York/Eas...|45041|
|Borough Park/Boro...|37329|
|   Canarsie/Canarsie|28026|
|Crown Heights Nor...|25976|
| Bay Ridge/Bay Ridge|17934|
|Jackson Heights/J...|14688|
|     Astoria/Astoria|14688|
|Central Harlem No...|14481|
|Bushwick South/Bu...|14424|
|Flatbush/Ditmas P...|13976|
+--------------------+-----+
only showing top 10 rows



### Bonus question. Join type

For finding the answer to Q6, you'll need to perform a join.

What type of join is it?

And how many stages your spark job has?

>Answer: Inner join. Spark job has 2 stages.