# Spark FHV Trip
This notebook will be used for experiment purpose in order to work on my assignment

## Import Libs

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
# Initate spark session
from pyspark.sql import SparkSession

# Spark session & context
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("sparkxperiment") \
    .getOrCreate()

22/12/11 09:41:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark

## Load Data

In [4]:
# See current dir
!pwd

/home/jovyan/work


In [73]:
df_fhv_trip = (
    spark.read
    .format("parquet")
    .option("header", True)
    .load("/home/jovyan/work/data/fhv_tripdata_2021-02.parquet")
)

In [74]:
df_fhv_trip.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00013|2021-02-01 00:01:00|2021-02-01 01:33:00|        null|        null|   null|                B00014|
|     B00021         |2021-02-01 00:55:40|2021-02-01 01:06:20|       173.0|        82.0|   null|       B00021         |
|     B00021         |2021-02-01 00:14:03|2021-02-01 00:28:37|       173.0|        56.0|   null|       B00021         |
|     B00021         |2021-02-01 00:27:48|2021-02-01 00:35:45|        82.0|       129.0|   null|       B00021         |
|              B00037|2021-02-01 00:12:50|2021-02-01 00:26:38|        null|       225.0|   null|                B00037|
+--------------------+------------------

In [75]:
df_fhv_trip.createOrReplaceTempView('fhv_trip')

## 1. How many taxi trips were there on February 15?

### With Spark SQL

In [50]:
df_result1_sql = spark.sql(
"""
SELECT count(*)
FROM fhv_trip
WHERE pickup_datetime LIKE "%2021-02-15%"
""")

In [51]:
df_result1_sql.show()



+--------+
|count(1)|
+--------+
|   34814|
+--------+



                                                                                

### With Spark

In [21]:
df_result1_spark = (df_fhv_trip.select('dispatching_base_num') \
    .filter(df_fhv_trip.pickup_datetime.contains("2021-02-15")) \
    .count())

                                                                                

In [22]:
df_result1_spark

34814

## 2. Find the longest trip for each day ?

### With Spark

In [50]:
date_col = F.to_date(df_fhv_trip.pickup_datetime)
df_temp = df_fhv_trip
org_col = 'Trip Date'

In [51]:
df_temp = df_fhv_trip.withColumn(
    'trip_duration',
    F.round((F.col('dropOff_datetime').cast('long') - F.col('pickup_datetime').cast('long'))/3600, 2)
)
df_temp = df_temp.withColumn('trip_duration', F.round(F.col('trip_duration'), 2))
df_temp = df_temp.groupBy(date_col).max('trip_duration')
df_temp = df_temp.withColumnRenamed(f'max({"trip_duration"})', 'Max Duration') \
    .withColumnRenamed(f'to_date({"pickup_datetime"})', org_col)
org_col = F.col(org_col)
df_temp = df_temp.orderBy(org_col.asc())

In [54]:
df_temp.show(28)



+----------+------------+
| Trip Date|Max Duration|
+----------+------------+
|2021-02-01|       771.5|
|2021-02-02|       23.18|
|2021-02-03|      667.25|
|2021-02-04|     1848.65|
|2021-02-05|       53.08|
|2021-02-06|        24.0|
|2021-02-07|       18.69|
|2021-02-08|      157.08|
|2021-02-09|       24.33|
|2021-02-10|       53.66|
|2021-02-11|       46.99|
|2021-02-12|        72.4|
|2021-02-13|      140.38|
|2021-02-14|       25.32|
|2021-02-15|       244.5|
|2021-02-16|       80.27|
|2021-02-17|       71.41|
|2021-02-18|       32.25|
|2021-02-19|       150.2|
|2021-02-20|       43.59|
|2021-02-21|       24.85|
|2021-02-22|      216.69|
|2021-02-23|      672.53|
|2021-02-24|        25.0|
|2021-02-25|      674.82|
|2021-02-26|       24.32|
|2021-02-27|      284.73|
|2021-02-28|      262.72|
+----------+------------+



                                                                                

## 3. Find Top 5 Most frequent `dispatching_base_num` ?

### With Spark SQL

In [23]:
df_result3_sql = spark.sql(
"""
WITH five_most_freq AS(
SELECT dispatching_base_num, COUNT(dispatching_base_num) AS freq
FROM fhv_trip
GROUP BY dispatching_base_num
ORDER BY freq DESC
LIMIT 5
)

SELECT *
FROM five_most_freq
""")

In [24]:
df_result3_sql.show()



+--------------------+-----+
|dispatching_base_num| freq|
+--------------------+-----+
|              B00856|35077|
|              B01312|33089|
|              B01145|31114|
|              B02794|30397|
|              B03016|29794|
+--------------------+-----+



                                                                                

In [56]:
df_result3_sql

DataFrame[dispatching_base_num: string, freq: bigint]

## 4. Find Top 5 Most common location pairs (PUlocationID and DOlocationID)?

In [87]:
df_target = spark.sql(
"""
SELECT *
FROM fhv_trip
WHERE PUlocationID IS NOT NULL 
AND
DOlocationID IS NOT NULL
"""
)

In [88]:
pair_col = 'pickup_dropOff_pair'
df_temp = df_target.withColumn(pair_col, F.struct(F.col('PUlocationID'), F.col('DOlocationID'))) 
pair_col = F.col(pair_col)
df_temp = df_temp.groupBy(pair_col).count()
df_result = df_temp.orderBy(F.col('count').desc())

In [89]:
df_result.show(5)



+-------------------+-----+
|pickup_dropOff_pair|count|
+-------------------+-----+
|     {206.0, 206.0}| 2374|
|     {221.0, 206.0}| 2112|
|     {129.0, 129.0}| 1902|
|         {7.0, 7.0}| 1829|
|     {179.0, 179.0}| 1736|
+-------------------+-----+
only showing top 5 rows



                                                                                

## 5. Write all of the result to BigQuery table (additional - point plus)

### Coming Soon...