# Assignment Session 6 - Spark SQL and Dataframes

## Importing Spark Libraries and Creating Spark Session

In [6]:
import pyspark
import wget

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("assignment_6") \
    .getOrCreate()

In [4]:
spark

## Load the Data

### Yellow Taxi Tripdata February 2021

In [23]:
df_yellow = spark.read.parquet(wget.download("https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-02.parquet"))
df_yellow.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2021-02-01 07:40:47|  2021-02-01 07:48:28|            1.0|          2.3|       1.0|                 N|         141|         226|           2|        8.5|  3.0|    0.5|       0.

### Green Taxi Tripdata February 2021

In [24]:
df_green = spark.read.parquet(wget.download("https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2021-02.parquet"))
df_green.show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|       2| 2021-02-01 07:34:03|  2021-02-01 07:51:58|                 N|       1.0|         130|         205|            5.0|         3.66|       14.0|  0.5|    0.

### For Hire Vehicle Tripdata February 2021

In [15]:
df_fhv = spark.read.parquet(wget.download("https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_2021-02.parquet"))
df_fhv.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00013|2021-02-01 07:01:00|2021-02-01 08:33:00|        null|        null|   null|                B00014|
|     B00021         |2021-02-01 07:55:40|2021-02-01 08:06:20|       173.0|        82.0|   null|       B00021         |
|     B00021         |2021-02-01 07:14:03|2021-02-01 07:28:37|       173.0|        56.0|   null|       B00021         |
|     B00021         |2021-02-01 07:27:48|2021-02-01 07:35:45|        82.0|       129.0|   null|       B00021         |
|              B00037|2021-02-01 07:12:50|2021-02-01 07:26:38|        null|       225.0|   null|                B00037|
|              B00037|2021-02-01 07:00:3

## Union Yellow with Green Taxi Tripdata

In [28]:
df_yellow_col = df_yellow.withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime').withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')
df_green_col = df_green.withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime').withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')

In [29]:
common_columns = []

yellow_columns = set(df_yellow_col.columns)

for col in df_green_col.columns:
    if col in yellow_columns:
        common_columns.append(col)

In [30]:
common_columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge']

In [31]:
from pyspark.sql import functions as F

In [32]:
green = df_green_col.select(common_columns).withColumn('service_type', F.lit('green'))
yellow = df_yellow_col.select(common_columns).withColumn('service_type', F.lit('yellow'))
df_taxi = green.unionAll(yellow)
df_taxi.show()

+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|VendorID|    pickup_datetime|   dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|congestion_surcharge|service_type|
+--------+-------------------+-------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|       2|2021-02-01 07:34:03|2021-02-01 07:51:58|                 N|       1.0|         130|         205|            5.0|         3.66|       14.0|  0.5|    0.5|      10.0|         0.0|       

## Practice

### How many taxi trips were there on February 15?

In [35]:
df_q1 = df_taxi.withColumn('pickup_datetime',
                           F.to_date(df_taxi.pickup_datetime)).filter((df_taxi['pickup_datetime']>='2021-02-15')&(df_taxi['pickup_datetime']<'2021-02-16')).select('pickup_datetime').groupBy('pickup_datetime').count()
df_q1.show()

+---------------+-----+
|pickup_datetime|count|
+---------------+-----+
|     2021-02-15|45497|
+---------------+-----+



in 15 February there were 45497 trips

### Find the longest trips for each day

In [38]:
df_q2 = df_taxi.withColumn('pickup_datetime',
                           F.to_date(df_taxi.pickup_datetime)).select('pickup_datetime','trip_distance').filter(df_taxi['pickup_datetime']>='2021-02-01').groupBy('pickup_datetime').max('trip_distance').sort('pickup_datetime')
df_q2.show()

+---------------+------------------+
|pickup_datetime|max(trip_distance)|
+---------------+------------------+
|     2021-02-01|             38.89|
|     2021-02-02|             73.24|
|     2021-02-03|         186079.73|
|     2021-02-04|         102620.98|
|     2021-02-05|          91134.16|
|     2021-02-06|             48.35|
|     2021-02-07|         186510.67|
|     2021-02-08|         186617.92|
|     2021-02-09|          89416.24|
|     2021-02-10|           60382.7|
|     2021-02-11|          43174.56|
|     2021-02-12|          66659.27|
|     2021-02-13|          54381.65|
|     2021-02-14|         115928.92|
|     2021-02-15|             52.89|
|     2021-02-16|         221188.25|
|     2021-02-17|         140145.44|
|     2021-02-18|          29501.25|
|     2021-02-19|             75.81|
|     2021-02-20|         188054.03|
+---------------+------------------+
only showing top 20 rows



### Find Top 5 Most frequent `dispatching_base_num` ?

In [39]:
df_q3 = df_fhv.groupBy("dispatching_base_num").count().sort('count',ascending=False).limit(5)
df_q3.show()

+--------------------+-----+
|dispatching_base_num|count|
+--------------------+-----+
|              B00856|35077|
|              B01312|33089|
|              B01145|31114|
|              B02794|30397|
|              B03016|29794|
+--------------------+-----+



### Find Top 5 Most common location pairs (PUlocationID and DOlocationID) ?

In [41]:
df_q4 = df_fhv.filter('PUlocationID is not NULL AND DOlocationID is not NULL').groupBy(['PUlocationID','DOlocationID']).count().sort('count',ascending=False).limit(5)
df_q4.show()

+------------+------------+-----+
|PUlocationID|DOlocationID|count|
+------------+------------+-----+
|       206.0|       206.0| 2374|
|       221.0|       206.0| 2112|
|       129.0|       129.0| 1902|
|         7.0|         7.0| 1829|
|       179.0|       179.0| 1736|
+------------+------------+-----+



#### Write all of the result to BigQuery table (additional - point plus)

In [None]:
df_q1.write.format('bigquery').option('table','cleaver-1.taxi.answer_1').save()

In [None]:
df_q2.write.format('bigquery').option('table','cleaver-1.taxi.answer_1').save()

In [None]:
df_q3.write.format('bigquery').option('table','cleaver-1.taxi.answer_1').save()

In [None]:
df_q4.write.format('bigquery').option('table','cleaver-1.taxi.answer_1').save()