In [8]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd

## <b>Try a couple of ways to download the data<b>

### A. Dowload the data from the link on browser and read it using pandas

In [4]:
# Use this code to save parquet into csv
df_fhvhv = pd.read_parquet('fhvhv_tripdata_2021-02.parquet')
df_fhvhv.to_csv('fhvhv_tripdata_2021-02.parquet')

### B. Download using requst library

In [None]:
# imported the requests library
import requests
trip_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-02.parquet"
  
# URL of the image to be downloaded is defined as image_url
trip_data = requests.get(trip_url) # create HTTP response object
with open("fhvhv_tripdata_2021-02.parquet",'wb') as fhvhv_taxi_trip:
  
    # Saving received content as a png file in
    # binary format
  
    # write the contents of the response (r.content)
    # to a new file in binary mode.
    fhvhv_taxi_trip.write(trip_data.content)

### C. Download using urlib.reqsuest library

In [37]:
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/fhvhv_tripdata_2021-02.parquet"
filename = "fhvhv_tripdata_2021-02.parquet"

urllib.request.urlretrieve(url, filename)

('fhvhv_tripdata_2021-02.parquet', <http.client.HTTPMessage at 0x21f9c76c6a0>)

In [43]:
df_fhvhv = pd.read_parquet('fhvhv_tripdata_2021-02.parquet')

In [44]:
df_fhvhv.shape

(11613942, 24)

In [45]:
df_fhvhv.head(1000)

Unnamed: 0,hvfhs_license_num,dispatching_base_num,originating_base_num,request_datetime,on_scene_datetime,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,trip_miles,...,sales_tax,congestion_surcharge,airport_fee,tips,driver_pay,shared_request_flag,shared_match_flag,access_a_ride_flag,wav_request_flag,wav_match_flag
0,HV0003,B02764,B02764,2021-01-31 23:59:00,2021-02-01 00:10:19,2021-02-01 00:10:40,2021-02-01 00:21:09,35,39,2.060,...,1.52,0.00,,0.00,9.79,N,N,,N,N
1,HV0003,B02764,B02764,2021-02-01 00:13:35,2021-02-01 00:25:23,2021-02-01 00:27:23,2021-02-01 00:44:01,39,35,3.150,...,2.85,0.00,,0.00,24.01,N,N,,N,N
2,HV0005,B02510,,2021-02-01 00:12:55,NaT,2021-02-01 00:28:38,2021-02-01 00:38:27,39,91,1.776,...,1.12,0.00,,0.00,6.91,N,N,N,N,N
3,HV0005,B02510,,2021-02-01 00:36:01,NaT,2021-02-01 00:43:37,2021-02-01 01:23:20,91,228,13.599,...,2.91,0.00,,7.00,35.05,N,N,N,N,N
4,HV0003,B02872,B02872,2021-01-31 23:57:50,2021-02-01 00:08:25,2021-02-01 00:08:42,2021-02-01 00:17:57,126,250,2.620,...,1.38,0.00,,0.00,8.53,N,N,,N,N
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,HV0003,B02872,B02872,2021-02-01 00:29:41,2021-02-01 00:35:34,2021-02-01 00:35:49,2021-02-01 00:40:59,79,144,0.640,...,1.82,2.75,,3.86,8.90,N,N,,N,N
996,HV0005,B02510,,2021-02-01 00:08:28,NaT,2021-02-01 00:16:35,2021-02-01 00:41:17,254,208,5.396,...,1.77,0.00,,0.00,18.40,N,N,N,N,N
997,HV0005,B02510,,2021-02-01 00:31:45,NaT,2021-02-01 00:42:49,2021-02-01 00:57:57,208,240,5.178,...,1.42,0.00,,0.00,13.35,N,N,N,N,N
998,HV0003,B02682,B02682,2021-02-01 00:06:31,2021-02-01 00:22:14,2021-02-01 00:23:08,2021-02-01 00:41:46,165,188,3.320,...,1.36,0.00,,0.00,15.97,N,N,,N,N


In [46]:
df_fhvhv.isnull().sum()

hvfhs_license_num              0
dispatching_base_num           0
originating_base_num     3319132
request_datetime               1
on_scene_datetime        3318817
pickup_datetime                0
dropoff_datetime               0
PULocationID                   0
DOLocationID                   0
trip_miles                     0
trip_time                      0
base_passenger_fare            0
tolls                          0
bcf                            0
sales_tax                      0
congestion_surcharge           0
airport_fee             11613181
tips                           0
driver_pay                     0
shared_request_flag            0
shared_match_flag              0
access_a_ride_flag             0
wav_request_flag               0
wav_match_flag                 0
dtype: int64

In [47]:
df_fhvhv.dtypes

hvfhs_license_num               object
dispatching_base_num            object
originating_base_num            object
request_datetime        datetime64[ns]
on_scene_datetime       datetime64[ns]
pickup_datetime         datetime64[ns]
dropoff_datetime        datetime64[ns]
PULocationID                     int64
DOLocationID                     int64
trip_miles                     float64
trip_time                        int64
base_passenger_fare            float64
tolls                          float64
bcf                            float64
sales_tax                      float64
congestion_surcharge           float64
airport_fee                    float64
tips                           float64
driver_pay                     float64
shared_request_flag             object
shared_match_flag               object
access_a_ride_flag              object
wav_request_flag                object
wav_match_flag                  object
dtype: object

# <b>Initate spark session<b>

In [70]:
# Create spark session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('tlc_taxi_feb2021') \
    .getOrCreate()

In [71]:
df_fhvhv_spark = spark.read.parquet("fhvhv_tripdata_2021-02.parquet")

In [102]:
df_fhvhv_spark.show()

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

# <b>The Task Question<b> 

### 1. How many taxi trips were there on February 15?

In [72]:
df_fhvhv_spark.createOrReplaceTempView("fhvhv_trips")

In [99]:
spark.sql("""
SELECT 
    COUNT(*) AS num_trips
FROM 
    fhvhv_trips
WHERE
    CAST(request_datetime AS date) = '2021-02-15'
""").show()

+---------+
|num_trips|
+---------+
|   424638|
+---------+



### 2. Find the longest trip for each day ?

In [103]:
# Add condition to pick non null value on request_datetime because there is 1 null value
spark.sql("""
SELECT 
    CAST(request_datetime AS date) AS date, MAX(trip_miles) AS max_trip_distance
FROM 
    fhvhv_trips
WHERE 
    CAST(request_datetime AS date) IS NOT NULL
GROUP BY 
    CAST(request_datetime AS date)
ORDER BY 
    CAST(request_datetime AS date) ASC 
""").show()

+----------+-----------------+
|      date|max_trip_distance|
+----------+-----------------+
|2021-02-01|           212.43|
|2021-02-02|           282.78|
|2021-02-03|           184.26|
|2021-02-04|           203.97|
|2021-02-05|           245.35|
|2021-02-06|           275.32|
|2021-02-07|           216.36|
|2021-02-08|            253.5|
|2021-02-09|           480.73|
|2021-02-10|            512.5|
|2021-02-11|           240.66|
|2021-02-12|           250.11|
|2021-02-13|           226.24|
|2021-02-14|           207.44|
|2021-02-15|          173.582|
|2021-02-16|          307.661|
|2021-02-17|           324.19|
|2021-02-18|           527.11|
|2021-02-19|           224.33|
|2021-02-20|           329.16|
+----------+-----------------+
only showing top 20 rows



### 3. Find Top 5 Most frequent `dispatching_base_num` ?

In [109]:
spark.sql("""
SELECT 
    dispatching_base_num, count(*) AS total_dispatch_num
FROM 
    fhvhv_trips
GROUP BY 
    dispatching_base_num
ORDER BY
    total_dispatch_num DESC
LIMIT 5
""").show()

+--------------------+------------------+
|dispatching_base_num|total_dispatch_num|
+--------------------+------------------+
|              B02510|           3233664|
|              B02764|            965568|
|              B02872|            882689|
|              B02875|            685390|
|              B02765|            559768|
+--------------------+------------------+



### 4. Find Top 5 Most common location pairs (PUlocationID and DOlocationID) ?

In [108]:
spark.sql("""
SELECT 
    PULocationID, DOlocationID, count(*) AS total_num_PO_DO_pairs
FROM 
    fhvhv_trips
GROUP BY 
    PUlocationID, 
    DOlocationID
ORDER BY
    total_num_PO_DO_pairs DESC
LIMIT 5
""").show()

+------------+------------+---------------------+
|PULocationID|DOlocationID|total_num_PO_DO_pairs|
+------------+------------+---------------------+
|          76|          76|                45041|
|          26|          26|                37329|
|          39|          39|                28026|
|          61|          61|                25976|
|          14|          14|                17934|
+------------+------------+---------------------+

