In [None]:
import pyspark
from pyspark.sql import SparkSession, types
from pyspark.sql import functions as F
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

## Week 5 Homework

In this homework we'll put what we learned about Spark
in practice.

We'll use high volume for-hire vehicles (HVFHV) dataset for that.

## Question 1. Install Spark and PySpark

* Install Spark
* Run PySpark
* Create a local spark session 
* Execute `spark.version`

What's the output?

In [2]:
spark.version

'3.2.1'

## Question 2. HVFHW February 2021

Download the HVFHV data for february 2021:

In [3]:
%cd data
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

[Errno 2] No such file or directory: '../data'
/Users/lorenz/repos/data_engineering_zoomcamp/week_5_batch_processing
--2022-03-08 02:12:28--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.216.134.75
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.216.134.75|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘fhvhv_tripdata_2021-02.csv’


2022-03-08 02:16:50 (2.69 MB/s) - ‘fhvhv_tripdata_2021-02.csv’ saved [733822658/733822658]




Read it with Spark using the same schema as we did 
in the lessons. We will use this dataset for all
the remaining questions.

Repartition it to 24 partitions and save it to
parquet.

What's the size of the folder with results (in MB)?

In [6]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.BooleanType(), True)
])

df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('data/fhvhv_tripdata_2021-02.csv')

In [None]:
df = df.repartition(24)
df.write.parquet('data/fhvhv/2021/02/')

In [17]:
!du -h data/fhvhv/2021/02/

221M	data/fhvhv/2021/02/


The size is 221 MB -> correct answer 208 MB

## Question 3. Count records 

How many taxi trips were there on February 15?

Consider only trips that started on February 15.

In [9]:
df = spark.read.parquet('data/fhvhv/2021/02/')

In [11]:
df \
    .filter(F.to_date(df.pickup_datetime) == F.lit('2021-02-15')) \
    .count()

                                                                                

367170

## Question 4. Longest trip for each day

Now calculate the duration for each trip.

Trip starting on which day was the longest? 

In [22]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('duration', df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long')) \
    .groupBy('pickup_date') \
    .max('duration') \
    .orderBy('max(duration)', ascending=False) \
    .limit(5) \
    .show()

[Stage 20:>                                                         (0 + 8) / 8]

+-----------+-------------+
|pickup_date|max(duration)|
+-----------+-------------+
| 2021-02-11|        75540|
| 2021-02-17|        57221|
| 2021-02-20|        44039|
| 2021-02-03|        40653|
| 2021-02-19|        37577|
+-----------+-------------+



                                                                                

## Question 5. Most frequent `dispatching_base_num`

Now find the most frequently occurring `dispatching_base_num` 
in this dataset.

How many stages this spark job has?

> Note: the answer may depend on how you write the query,
> so there are multiple correct answers. 
> Select the one you have.

In [27]:
df \
    .groupBy('dispatching_base_num') \
    .count() \
    .orderBy('count', ascending=False) \
    .limit(5) \
    .tail()

+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
+--------------------+-------+



According to Spark Jobs the query had 2 stages

## Question 6. Most common locations pair

Find the most common pickup-dropoff pair. 

For example:

"Jamaica Bay / Clinton East"

Enter two zone names separated by a slash

If any of the zone names are unknown (missing), use "Unknown". For example, "Unknown / Clinton East". 

In [110]:
df_zones = spark.read.parquet('data/zones')

In [111]:
# Set values in column Zone to "Unknown", when the Borough is unknown. The values are 'NA' and 'NV' beforehand.
df_zones = df_zones \
    .withColumn('Zone',
        F.when(F.upper(df_zones.Borough) == 'UNKNOWN', F.lit('Unknown')) \
        .otherwise(df_zones.Zone)) \
    .toDF(*df_zones.columns)

In [115]:
# rename dataframe and columns as a preparation to join two times afterwards
df_zones_pu = df_zones.alias("df_zones_pu") \
                      .withColumnRenamed('LocationID', 'pu_LocationID') \
                      .withColumnRenamed('Borough', 'pu_Borough') \
                      .withColumnRenamed('Zone', 'pu_Zone') \
                      .withColumnRenamed('service_zone', 'pu_service_zone')
df_zones_do = df_zones.alias("df_zones_do") \
                      .withColumnRenamed('LocationID', 'do_LocationID') \
                      .withColumnRenamed('Borough', 'do_Borough') \
                      .withColumnRenamed('Zone', 'do_Zone') \
                      .withColumnRenamed('service_zone', 'do_service_zone')

In [118]:
df \
    .join(df_zones_pu, df.PULocationID == df_zones_pu.pu_LocationID, 'left') \
    .join(df_zones_do, df.DOLocationID == df_zones_do.do_LocationID, 'left') \
    .withColumn('pd_pair', F.concat(df_zones_pu.pu_Zone, F.lit(' / '), df_zones_do.do_Zone)) \
    .select('pd_pair') \
    .groupBy('pd_pair') \
    .count() \
    .orderBy('count', ascending=False) \
    .take(3)

                                                                                

[Row(pd_pair='East New York / East New York', count=45041),
 Row(pd_pair='Borough Park / Borough Park', count=37329),
 Row(pd_pair='Canarsie / Canarsie', count=28026)]

## Bonus question. Join type

(not graded) 

For finding the answer to Q6, you'll need to perform a join.

What type of join is it?

And how many stages your spark job has?

It is a ``LEFT JOIN``. The spark job used 3 stages:
1. Use broadcast join, send zones data to every partition.
2. Join data in every partition.
3. Reshuffle intermediate results and combine to final result.

## Submitting the solutions

* Form for submitting: https://forms.gle/dBkVK9yT8cSMDwuw7
* You can submit your homework multiple times. In this case, only the last submission will be used. 

Deadline: 07 March (Monday), 22:00 CET

In [119]:
spark.stop()