## Week 5 Homework

In this homework we'll put what we learned about Spark in practice.

We'll use high volume for-hire vehicles (HVFHV) dataset for that.

In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

22/02/27 21:07:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [7]:
spark.version

'3.0.3'

## Question 2. HVFHW February 2021

Download the HVFHV data for february 2021:

wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

Read it with Spark using the same schema as we did in the lessons. We will use this dataset for all the remaining questions.

Repartition it to 24 partitions and save it to parquet.

What's the size of the folder with results (in MB)?

In [8]:
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

--2022-02-27 17:32:11--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.217.81.156
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.217.81.156|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘fhvhv_tripdata_2021-02.csv’


2022-02-27 17:32:46 (19.9 MB/s) - ‘fhvhv_tripdata_2021-02.csv’ saved [733822658/733822658]



In [11]:
!head -n 5 fhvhv_tripdata_2021-02.csv








In [12]:
!wc -l fhvhv_tripdata_2021-02.csv

11613943 fhvhv_tripdata_2021-02.csv


In [14]:
# define schema
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [15]:
#read the dataset
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-02.csv')

In [16]:
df.repartition(24) \
  .write.parquet('fhvhv/2021/02/')

                                                                                

In [18]:
!du -sh fhvhv/2021/02/

210M	fhvhv/2021/02/


## Question 3. Count records

How many taxi trips were there on February 15?

Consider only trips that started on February 15.

In [4]:
df_new = spark.read.parquet('fhvhv/2021/02/')

                                                                                

In [5]:
df_new.columns

['hvfhs_license_num',
 'dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag']

In [5]:
df_new.registerTempTable('green_feb_2021')

In [7]:
df_green_feb15_count = spark.sql("""
SELECT 
    COUNT(1) AS number_records
FROM
    green_feb_2021
WHERE
    DATE(pickup_datetime) = '2021-02-15'

""")



In [8]:
df_green_feb15_count.show()



+--------------+
|number_records|
+--------------+
|        367170|
+--------------+



                                                                                

In [6]:
import pyspark.sql.functions as F

In [18]:
df_result = df_new.withColumn("datetype",F.to_date(df_new.pickup_datetime,"yyyy-MM-dd")) 
    

In [24]:
df_result.filter(df_result.datetype == '2021-02-15').count()

                                                                                

367170

## Question 4. Longest trip for each day

Now calculate the duration for each trip.

Trip starting on which day was the longest?

In [18]:
spark.sql("""
select 
    pickup_datetime,
    dropoff_datetime, 
   ((bigint(to_timestamp(dropoff_datetime)))-(bigint(to_timestamp(pickup_datetime))))/(60) as diff

from 
    green_feb_2021
ORDER BY diff DESC
    
     """).show(5)

[Stage 2:>                                                          (0 + 4) / 4]

+-------------------+-------------------+-----------------+
|    pickup_datetime|   dropoff_datetime|             diff|
+-------------------+-------------------+-----------------+
|2021-02-11 13:40:44|2021-02-12 10:39:44|           1259.0|
|2021-02-17 15:54:53|2021-02-18 07:48:34|953.6833333333333|
|2021-02-20 12:08:15|2021-02-21 00:22:14|733.9833333333333|
|2021-02-03 20:24:25|2021-02-04 07:41:58|           677.55|
|2021-02-19 23:17:44|2021-02-20 09:44:01|626.2833333333333|
+-------------------+-------------------+-----------------+
only showing top 5 rows



                                                                                

https://stackoverflow.com/questions/60386256/sparksql-difference-between-two-time-stamps-in-minutes

## Question 5. Most frequent dispatching_base_num

Now find the most frequently occurring dispatching_base_num in this dataset.

How many stages this spark job has?

Note: the answer may depend on how you write the query, so there are multiple correct answers. Select the one you have.

In [20]:
spark.sql("""
SELECT
    dispatching_base_num,
    
    COUNT(*) AS record_count

FROM 
    green_feb_2021
    
GROUP BY dispatching_base_num

ORDER BY record_count DESC
    
     """).show(5)



+--------------------+------------+
|dispatching_base_num|record_count|
+--------------------+------------+
|              B02510|     3233664|
|              B02764|      965568|
|              B02872|      882689|
|              B02875|      685390|
|              B02765|      559768|
+--------------------+------------+
only showing top 5 rows



                                                                                

## Question 6. Most common locations pair

Find the most common pickup-dropoff pair.

For example:

"Jamaica Bay / Clinton East"

Enter two zone names separated by a slash

If any of the zone names are unknown (missing), use "Unknown". For example, "Unknown / Clinton East".

In [21]:
df_new.show(5)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02887|2021-02-06 01:18:35|2021-02-06 01:40:34|         163|         235|   null|
|           HV0005|              B02510|2021-02-05 07:13:06|2021-02-05 07:31:56|         225|         181|   null|
|           HV0003|              B02869|2021-02-04 16:56:52|2021-02-04 17:21:36|         260|          95|   null|
|           HV0003|              B02871|2021-02-03 18:34:17|2021-02-03 18:57:12|         235|          60|   null|
|           HV0003|              B02869|2021-02-04 07:25:09|2021-02-04 07:30:34|          55|          55|   null|
+-----------------+--------------------+-------------------+-------------------+

In [22]:
zone_df = spark.read \
            . option('header',"true") \
            .csv('taxi+_zone_lookup.csv')

In [23]:
zone_df.registerTempTable('zones')

In [25]:
zone_df.show(3)

+----------+-------+--------------------+------------+
|LocationID|Borough|                Zone|service_zone|
+----------+-------+--------------------+------------+
|         1|    EWR|      Newark Airport|         EWR|
|         2| Queens|         Jamaica Bay|   Boro Zone|
|         3|  Bronx|Allerton/Pelham G...|   Boro Zone|
+----------+-------+--------------------+------------+
only showing top 3 rows



In [28]:
spark.sql("""
SELECT 
         g.PULocationID,g.DOLocationID ,COUNT(g.PULocationID) AS record_count,
         zpu.Zone as pick_up_zone,
         zdo.Zone  as dr_off_zone
FROM   green_feb_2021 g,
       zones zpu,
       zones zdo
WHERE g.PULocationID= zpu.locationid AND
      g.DOLocationID= zdo.locationid 
	   
	    
GROUP BY g.PULocationID,g.DOLocationID,zpu.Zone,zdo.Zone

ORDER BY record_count DESC

""").show(5)



+------------+------------+------------+-------------------+-------------------+
|PULocationID|DOLocationID|record_count|       pick_up_zone|        dr_off_zone|
+------------+------------+------------+-------------------+-------------------+
|          76|          76|       45041|      East New York|      East New York|
|          26|          26|       37329|       Borough Park|       Borough Park|
|          39|          39|       28026|           Canarsie|           Canarsie|
|          61|          61|       25976|Crown Heights North|Crown Heights North|
|          14|          14|       17934|          Bay Ridge|          Bay Ridge|
+------------+------------+------------+-------------------+-------------------+
only showing top 5 rows



                                                                                