# Optimization of NYC Taxi Driver Earnings Using Big Data Analytics

## Part 1: Data Ingestion and Preparation

### Loading files

In [None]:
#Loading libraries
from pyspark.sql.functions import input_file_name,regexp_extract
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType

In [None]:
#Load all files in yellow folder.
dbutils.fs.ls("/FileStore/yellow/")

Out[1]: [FileInfo(path='dbfs:/FileStore/yellow/yellow_taxi_2015.parquet', name='yellow_taxi_2015.parquet', size=2896588873, modificationTime=1695883950000),
 FileInfo(path='dbfs:/FileStore/yellow/yellow_taxi_2016.parquet', name='yellow_taxi_2016.parquet', size=2616839324, modificationTime=1695884103000),
 FileInfo(path='dbfs:/FileStore/yellow/yellow_taxi_2017.parquet', name='yellow_taxi_2017.parquet', size=2072273391, modificationTime=1695884235000),
 FileInfo(path='dbfs:/FileStore/yellow/yellow_taxi_2018.parquet', name='yellow_taxi_2018.parquet', size=2098068908, modificationTime=1695884365000),
 FileInfo(path='dbfs:/FileStore/yellow/yellow_taxi_2019.parquet', name='yellow_taxi_2019.parquet', size=1788371641, modificationTime=1695884481000),
 FileInfo(path='dbfs:/FileStore/yellow/yellow_taxi_2020.parquet', name='yellow_taxi_2020.parquet', size=533759081, modificationTime=1695884530000),
 FileInfo(path='dbfs:/FileStore/yellow/yellow_taxi_2021.parquet', name='yellow_taxi_2021.parquet', 

In [None]:
#Load all files in green folder.
dbutils.fs.ls("/FileStore/green/")

Out[2]: [FileInfo(path='dbfs:/FileStore/green/green_taxi_2015.parquet', name='green_taxi_2015.parquet', size=404556105, modificationTime=1695883693000),
 FileInfo(path='dbfs:/FileStore/green/green_taxi_2016.parquet', name='green_taxi_2016.parquet', size=346731598, modificationTime=1695883717000),
 FileInfo(path='dbfs:/FileStore/green/green_taxi_2017.parquet', name='green_taxi_2017.parquet', size=251504885, modificationTime=1695883735000),
 FileInfo(path='dbfs:/FileStore/green/green_taxi_2018.parquet', name='green_taxi_2018.parquet', size=193874396, modificationTime=1695883748000),
 FileInfo(path='dbfs:/FileStore/green/green_taxi_2019.parquet', name='green_taxi_2019.parquet', size=142163406, modificationTime=1695883759000),
 FileInfo(path='dbfs:/FileStore/green/green_taxi_2020.parquet', name='green_taxi_2020.parquet', size=37076741, modificationTime=1695883766000),
 FileInfo(path='dbfs:/FileStore/green/green_taxi_2021.parquet', name='green_taxi_2021.parquet', size=23479865, modification

### Reading files and save them into database df_green and df_yellow

In [None]:
df_yellow = spark.read.option("header", True).parquet("/FileStore/yellow/").withColumn("fileyear", regexp_extract(input_file_name(), r'(\d{4})\.parquet', 1))


In [None]:
df_green = spark.read.option("header", True).parquet("/FileStore/green/").withColumn("fileyear", regexp_extract(input_file_name(), r'(\d{4})\.parquet', 1))

### Count the total numbers of rows for each taxi colour 

In [None]:
# Count numbers of rows for yellow taxi
df_yellow.count()

In [None]:
# Count numbers of rows for green taxi
df_green.count()

### Convert the “Green” 2015 parquet into a csv file and compare files size.

In [None]:
green_2015 = spark.read.parquet("/FileStore/green/green_taxi_2015.parquet")
green_2015.write.option("header","true").csv("/FileStore/df_green_2015.csv")

In [None]:
# Check the csv file size
dbutils.fs.ls("/FileStore/df_green_2015.csv")

Out[8]: [FileInfo(path='dbfs:/FileStore/df_green_2015.csv/_SUCCESS', name='_SUCCESS', size=0, modificationTime=1695900225000),
 FileInfo(path='dbfs:/FileStore/df_green_2015.csv/_committed_7694727312407993374', name='_committed_7694727312407993374', size=288, modificationTime=1695900225000),
 FileInfo(path='dbfs:/FileStore/df_green_2015.csv/_started_7694727312407993374', name='_started_7694727312407993374', size=0, modificationTime=1695899973000),
 FileInfo(path='dbfs:/FileStore/df_green_2015.csv/part-00000-tid-7694727312407993374-731f9a44-4b83-43f5-97bd-766d09d39918-41-1-c000.csv', name='part-00000-tid-7694727312407993374-731f9a44-4b83-43f5-97bd-766d09d39918-41-1-c000.csv', size=718297967, modificationTime=1695900196000),
 FileInfo(path='dbfs:/FileStore/df_green_2015.csv/part-00001-tid-7694727312407993374-731f9a44-4b83-43f5-97bd-766d09d39918-42-1-c000.csv', name='part-00001-tid-7694727312407993374-731f9a44-4b83-43f5-97bd-766d09d39918-42-1-c000.csv', size=716001938, modificationTime=169

File size for csv files are sum of 718297967, 716001938 and 744817515

In [None]:
# Check the parquet file size
dbutils.fs.ls("/FileStore/green/green_taxi_2015.parquet")

Out[9]: [FileInfo(path='dbfs:/FileStore/green/green_taxi_2015.parquet', name='green_taxi_2015.parquet', size=404556105, modificationTime=1695883693000)]

File size for parquet files is 404556105 which is much smaller than csv file format

### Data cleaning to remove unrealistic trips

In [None]:
df_green_cleaned=df_green
df_yellow_cleaned=df_yellow

#### Filter 1: Remove trips finishing before the starting time

In [None]:
# Using subtract to remove a dataframe inside a dataframe.
filter_1_green= (F.col("lpep_dropoff_datetime")>=F.col("lpep_pickup_datetime"))

In [None]:
filter_1_yellow= (F.col("tpep_dropoff_datetime")>=F.col("tpep_pickup_datetime"))

#### Filter 2: Remove trips where the pickup/dropoff datetime is outside of the range

In [None]:
#Pick up date out side of range
filter_2_green= (F.year(F.col("lpep_pickup_datetime"))==F.col("fileyear").cast("int"))
filter_2_2_green= (F.year(F.col("lpep_dropoff_datetime"))==F.col("fileyear").cast("int"))

In [None]:
filter_2_yellow= (F.year(F.col("tpep_pickup_datetime"))==F.col("fileyear").cast("int"))
filter_2_2_yellow= (F.year(F.col("tpep_dropoff_datetime"))==F.col("fileyear").cast("int"))

#### Filter 3: Remove trips with negative speed

In [None]:
# distant in miles is converted to km and the time is convert to hour. The measurement for speeding is kms
df_green_cleaned=df_green_cleaned.withColumn("distance_km",F.expr("trip_distance*1.609344"))
# Remove mile distance
df_green_cleaned=df_green_cleaned.drop("trip_distance")
df_green_cleaned=df_green_cleaned.withColumn("speed",F.expr("distance_km/((unix_timestamp(lpep_dropoff_datetime)-unix_timestamp(lpep_pickup_datetime))/3600)"))

In [None]:
# distant in miles is converted to km and the time is convert to hour. The measurement for speeding is kms
df_yellow_cleaned=df_yellow_cleaned.withColumn("distance_km",F.expr("trip_distance*1.609344"))
# Remove mile distance
df_yellow_cleaned=df_yellow_cleaned.drop("trip_distance")
df_yellow_cleaned=df_yellow_cleaned.withColumn("speed",F.expr("distance_km/((unix_timestamp(tpep_dropoff_datetime)-unix_timestamp(tpep_pickup_datetime))/3600)"))

In [None]:
# Remove trips that speed smaller than 0
filter_3=(F.col("speed")>=0)

#### Filter 4: Remove trips with very high speed with greater than 88.5 km/h

In [None]:
filter_4= (F.col("speed")<=88.5)

#### Filter 5: Remove trips that are travelling too short or too long (duration wise)

By using Google map to find longest distant from brooklyn to bronx, it can be see that it only took 1 and 30 minute. So I claimed that the longest travel is no longer than 3 hours

In [None]:
filter_5_green=(F.expr("(unix_timestamp(lpep_dropoff_datetime)-unix_timestamp(lpep_pickup_datetime))/3600")<=3)

In [None]:
filter_5_yellow=(F.expr("(unix_timestamp(tpep_dropoff_datetime)-unix_timestamp(tpep_pickup_datetime))/3600")<=3)

In [None]:
#Remove trips that lower than 1 minutes.
filter_5_2_green= (F.expr("unix_timestamp(lpep_dropoff_datetime)-unix_timestamp(lpep_pickup_datetime)")>60)

In [None]:
#Remove trips that lower than 1 minutes.
filter_5_2_yellow= (F.expr("unix_timestamp(tpep_dropoff_datetime)-unix_timestamp(tpep_pickup_datetime)")>60)

#### Filter 6: Remove trips that are travelling too short or too long (distance wise)

By using Google map to find longest distant from brooklyn to bronx, it can be see that the distance is 50 km. So I claimed that the longest travel is no longer than 100km

In [None]:
filter_6= (F.col("distance_km")<=100)

In [None]:
# For too short distance, it should not shorter than 0.5km
filter_6_2= (F.col("distance_km")>0.5)

#### Filter 7: Remove trips with very low speed with lower than 5 km/h

In [None]:
filter_7= (F.col("speed")>=5)

#### Filter 8: Remove trips that are total money charge is less than 0.

In [None]:
filter_8= (F.col("total_amount")>0)

#### Remove duplicated rows

In [None]:
#Remove duplicates records in green data
df_green_cleaned=df_green_cleaned.dropDuplicates()

In [None]:
#Remove duplicates records in yellow data
df_yellow_cleaned=df_yellow_cleaned.dropDuplicates()

#### Apply all filters for two given datasets

In [None]:
#Apply all filter above to green data
df_green_cleaned=df_green_cleaned.filter(filter_1_green).filter(filter_2_green).filter(filter_2_2_green).filter(filter_3).filter(filter_4).filter(filter_5_green).filter(filter_5_2_green).filter(filter_6).filter(filter_7).filter(filter_8)

In [None]:
#Apply all filter above to yellow data
df_yellow_cleaned=df_yellow_cleaned.filter(filter_1_yellow).filter(filter_2_yellow).filter(filter_2_2_yellow).filter(filter_3).filter(filter_4).filter(filter_5_yellow).filter(filter_5_2_yellow).filter(filter_6).filter(filter_7).filter(filter_8)

### Rename and Reorder datasets and then combines them

In [None]:
#Rename pick up and drop off for green taxis
df_green_cleaned=df_green_cleaned.withColumnRenamed("lpep_pickup_datetime", "pickup_datetime")
df_green_cleaned=df_green_cleaned.withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")

In [None]:
#Rename pick up and drop off for yellow taxis
df_yellow_cleaned=df_yellow_cleaned.withColumnRenamed("tpep_pickup_datetime", "pickup_datetime")
df_yellow_cleaned=df_yellow_cleaned.withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")

In [None]:
# Add colour to green dataset and a null records for airport fee.
df_green_cleaned=df_green_cleaned.withColumn("colour", F.lit("green"))
df_green_cleaned=df_green_cleaned.withColumn("airport_fee", F.lit(None).cast("int"))

In [None]:
# Add colour to yellow dataset and a null records for ehail fee and trip type.
df_yellow_cleaned=df_yellow_cleaned.withColumn("colour", F.lit("yellow"))
df_yellow_cleaned=df_yellow_cleaned.withColumn("ehail_fee", F.lit(None).cast("int"))
df_yellow_cleaned=df_yellow_cleaned.withColumn("trip_type", F.lit(None).cast("string"))

In [None]:
#Reorder green datasets columns
df_green_cleaned = df_green_cleaned.select(*['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge',
 'fileyear',
 'distance_km',
 'speed',
 'colour',
 'ehail_fee',
 'trip_type',
 'airport_fee'
 ])

In [None]:
#Reorder yellow datasets columns
df_yellow_cleaned = df_yellow_cleaned.select(*[ 
 'VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge',
 'fileyear',
 'distance_km',
 'speed',
 'colour',
 'ehail_fee',
 'trip_type',
 'airport_fee'
])

Export dataset for green taxis to a parquet file

In [None]:
df_green_cleaned.write.parquet("/FileStore/df_green_cleaned.parquet")

Export dataset for yellow taxis to a parquet file

In [None]:
df_yellow_cleaned.write.parquet("/FileStore/df_yellow_cleaned.parquet")

Combine two dataset into one dataset

In [None]:
df_combined=spark.read.parquet("/FileStore/df_green_cleaned.parquet").union(spark.read.parquet("/FileStore/df_yellow_cleaned.parquet"))

#### Combine the new dataframe with the location data

In [None]:
#Read the locations file
taxi_zone_lookup = spark.read.csv("/FileStore/taxi_zone_lookup.csv", header=True)

In [None]:
#Rename location id to pick up location id
taxi_zone_lookup=taxi_zone_lookup.withColumnRenamed("LocationID", "PULocationID")

In [None]:
#Left join the combine dataset vs location file by pickup location id column
df_combined = df_combined.join(taxi_zone_lookup, on="PULocationID", how="left")

In [None]:
#Rename location columns to pick up location columns
df_combined=df_combined.withColumnRenamed("Borough", "PUBorough")
df_combined=df_combined.withColumnRenamed("Zone", "PUZone")
df_combined=df_combined.withColumnRenamed("service_zone", "PUservice_zone")

In [None]:
#Change the pickuplocationid to drop off locatoon in locations file
taxi_zone_lookup=taxi_zone_lookup.withColumnRenamed("PULocationID", "DOLocationID")

In [None]:
#Left join the combine dataset vs location file by drop off location id column
df_combined = df_combined.join(taxi_zone_lookup, on="DOLocationID", how="left")

In [None]:
#Rename location columns to drop off location columns
df_combined=df_combined.withColumnRenamed("Borough", "DOBorough")
df_combined=df_combined.withColumnRenamed("Zone", "DOZone")
df_combined=df_combined.withColumnRenamed("service_zone", "DOservice_zone")

In [None]:
#Save the final dataset as df_taxi_final
df_combined.write.parquet("/FileStore/df_taxi_final.parquet")

## Part 2: Business Insights

In [None]:
#Read the final dataset
df_final=spark.read.parquet("/FileStore/df_taxi_final.parquet")

In [None]:
#Create a view call NYC_taxi for part 2
df_final.createOrReplaceTempView("NYC_taxi")

In [None]:
#Count the total rows
spark.sql('''SELECT COUNT(*) FROM NYC_taxi''').show()

+---------+
| count(1)|
+---------+
|710164711|
+---------+



### Statistical summaries for each year and month

In [None]:
spark.sql('''
SELECT 
CONCAT(YEAR(pickup_datetime),'-',LPAD(MONTH(pickup_datetime),2,'0')) AS year_month, -- Convert timestamp to year and month then combine them.
COUNT(*) AS Trip_numbers,-- count amount of rows
MODE(date_format(pickup_datetime,'EEEE')) AS Day_of_week, -- Convert timestamp to day of week where EEEE mean shows in full text
MODE(HOUR(pickup_datetime)) AS Hour_of_day,-- Convert timestamp to hour of day
AVG(passenger_count) AS Average_passenger,-- Average passenger count
AVG(total_amount) AS Average_amount_paid_per_trip, -- Average amount paid per trip
AVG(total_amount/passenger_count) AS Average_amount_paid_per_passenger -- Average amount paid per passenger
FROM 
NYC_taxi
GROUP BY 
YEAR(pickup_datetime), MONTH(pickup_datetime)
ORDER BY 
year_month
'''
).show()

+----------+------------+-----------+-----------+------------------+----------------------------+---------------------------------+
|year_month|Trip_numbers|Day_of_week|Hour_of_day| Average_passenger|Average_amount_paid_per_trip|Average_amount_paid_per_passenger|
+----------+------------+-----------+-----------+------------------+----------------------------+---------------------------------+
|   2015-01|    14013391|   Saturday|         19|1.6531349193068259|          15.053532772269476|               12.370529982728575|
|   2015-02|    13746678|   Saturday|         19|1.6398554618068453|          15.259085646662706|               12.530163944336808|
|   2015-03|    14755224|     Sunday|         19|1.6400066850899722|          15.682274736712088|               12.849473606365931|
|   2015-04|    14424813|   Thursday|         19|1.6456055964122378|          15.873917839324294|               12.986156938149938|
|   2015-05|    14602446|   Saturday|         19|1.6527965246370369|        

### Statistical summaries for each taxi color

In [None]:
spark.sql('''
SELECT 
colour AS Taxi_colour,
AVG(round((unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime))/ 60.0, 2)) AS Average_duration_minutes,-- Average trip duration in minutes and round to 2 decimal
MEDIAN(round((unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime))/ 60.0, 2)) AS Median_duration_minutes,-- Midian trip duration in minutes and round to 2 decimal
MIN(round((unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime))/ 60.0, 2)) AS Minimum_duration_minutes,-- Minimum trip duration in minutes and round to 2 decimal
MAX(round((unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime))/ 60.0, 2)) AS Maximum_duration_minutes,-- Maximum trip duration in minutes and round to 2 decimal
AVG(distance_km) AS Average_distance_km,-- Average distance by kilometer
MEDIAN(distance_km) AS Median_distance_km,-- Median distance by kilometer
MIN(distance_km) AS Minimum_distance_km,-- Minimum distance by kilometer
MAX(distance_km) AS Maximum_distance_km,-- Maximum distance by kilometer
AVG(speed) AS Average_speed_kmh,-- Average speed by kilometer per hour
MEDIAN(speed) AS Median_speed_kmh,-- Median speed by kilometer per hour
MIN(speed) AS Minimum_speed_kmh,-- Minimum speed by kilometer per hour
MAX(speed) AS Maximum_speed_kmh-- Maximum speed by kilometer per hour
FROM 
NYC_taxi
GROUP BY 
colour
ORDER BY 
colour
'''
).show()

+-----------+------------------------+-----------------------+------------------------+------------------------+-------------------+------------------+-------------------+-------------------+------------------+------------------+-----------------+-----------------+
|Taxi_colour|Average_duration_minutes|Median_duration_minutes|Minimum_duration_minutes|Maximum_duration_minutes|Average_distance_km|Median_distance_km|Minimum_distance_km|Maximum_distance_km| Average_speed_kmh|  Median_speed_kmh|Minimum_speed_kmh|Maximum_speed_kmh|
+-----------+------------------------+-----------------------+------------------------+------------------------+-------------------+------------------+-------------------+-------------------+------------------+------------------+-----------------+-----------------+
|      green|               13.971448|                  10.72|                    1.02|                  179.98|  4.940264679059886|         3.1382208|         0.09656064|        99.95635584| 20.3175358

### Statistical summaries for each taxi color (yellow and green), each pair of pick up and drop off boroughs locations, each month, each day of week and each hour

In [None]:
spark.sql('''
SELECT 
colour AS Taxi_colour,
PUBorough AS Pick_up_location,
DOBorough AS Drop_off_location,
LPAD(MONTH(pickup_datetime),2,'0') AS Month_of_year,-- Extract month from a timestamp
date_format(pickup_datetime,'EEEE') AS Day_of_week,-- Extract day from a timestamp
HOUR(pickup_datetime) AS Hour_of_day,-- Extract hour from a timestamp
COUNT(*) AS Trip_numbers,-- Count number of trips
AVG(distance_km) AS Average_distance,-- Average distance per trip in kilometer
AVG(total_amount) AS Average_amount_paid_per_trip,-- Average amount paid per trip
SUM(total_amount) AS Total_amount_paid -- Total number of money has paid
FROM 
NYC_taxi
GROUP BY 
Taxi_colour,Pick_up_location,Drop_off_location,Month_of_year, Day_of_week, Hour_of_day
ORDER BY 
Taxi_colour,Pick_up_location,Drop_off_location,Month_of_year, Day_of_week, Hour_of_day
'''
).show()

+-----------+----------------+-----------------+-------------+-----------+-----------+------------+------------------+----------------------------+------------------+
|Taxi_colour|Pick_up_location|Drop_off_location|Month_of_year|Day_of_week|Hour_of_day|Trip_numbers|  Average_distance|Average_amount_paid_per_trip| Total_amount_paid|
+-----------+----------------+-----------------+-------------+-----------+-----------+------------+------------------+----------------------------+------------------+
|      green|           Bronx|            Bronx|           01|     Friday|          0|         931|3.7668675588399565|          11.075617615467218| 10311.39999999998|
|      green|           Bronx|            Bronx|           01|     Friday|          1|         829|  3.74027852583836|          11.248419782870906|  9324.93999999998|
|      green|           Bronx|            Bronx|           01|     Friday|          2|         616|3.9220183542857145|          11.832581168831146| 7288.869999999986

### The percentage of trips where drivers received tips

In [None]:
spark.sql('''
SELECT 
(SUM(CASE WHEN tip_amount>0 THEN 1 ELSE 0 END)/COUNT(*))*100 AS percentage_received_tip -- Count the number of trips that recieved tips over all trips
FROM 
NYC_taxi
'''
).show()

+-----------------------+
|percentage_received_tip|
+-----------------------+
|      63.70771526550831|
+-----------------------+



### The proportion of drivers who received tips amounting to at least $5 per trip

In [None]:
spark.sql('''
SELECT 
(SUM(CASE WHEN tip_amount>=5 THEN 1 ELSE 0 END)/SUM(CASE WHEN tip_amount>0 THEN 1 ELSE 0 END))*100 AS percentage_received_5dollars_tip -- Count the number of trips that recieved $5 tips over all trips that recieved tips
FROM 
NYC_taxi
'''
).show()

+--------------------------------+
|percentage_received_5dollars_tip|
+--------------------------------+
|              12.287222418318981|
+--------------------------------+



### Statistical summaries for each trip into bins of durations

In [None]:
spark.sql('''
SELECT
AVG(speed) AS Average_speed, --Average speed
AVG(distance_km/total_amount) AS Average_distance_per_dollar, --Average distance per dollar
CASE --Create bins of trip durations
WHEN (unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime))/60.0 <= 5 THEN 'Under 5 Mins'
WHEN (unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime))/60.0 > 5 AND (unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime))/ 60.0 <= 10 THEN 'From 5 mins to 10 mins'
WHEN (unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime))/60.0 > 10 AND (unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime))/ 60.0 <= 20 THEN 'From 10 mins to 20 mins'
WHEN (unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime))/60.0 > 20 AND (unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime))/ 60.0 <= 30 THEN 'From 20 mins to 30 mins'
WHEN (unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime))/60.0 > 30 AND (unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime))/ 60.0 <= 60 THEN 'From 30 mins to 60 mins'
ELSE 'At least 60 mins'
END AS bins_of_durations
FROM 
NYC_taxi
GROUP BY
bins_of_durations
ORDER BY
Average_distance_per_dollar 
'''
).show()


+------------------+---------------------------+--------------------+
|     Average_speed|Average_distance_per_dollar|   bins_of_durations|
+------------------+---------------------------+--------------------+
| 19.60273018813133|        0.16154092502199857|        Under 5 Mins|
|17.091168688731226|        0.21025061560854924|From 5 mins to 10...|
|17.878275341594218|        0.26013598338870625|From 10 mins to 2...|
| 21.53802559607799|         0.3112550310469862|From 20 mins to 3...|
|   25.969016947008|         0.3774536957353038|From 30 mins to 6...|
|23.014794305077874|         0.5334452551560225|    At least 60 mins|
+------------------+---------------------------+--------------------+



## Part 3: Build different ML models using Spark ML pipelines + a baseline model to predict the total amount of a trip

### Data loading

In [None]:
#Loading final dataset
df_final=spark.read.parquet("/FileStore/df_taxi_final.parquet")

In [None]:
# Add new columns such as month, day and hour to final dataset 
df_final=df_final.withColumn("month_of_year",F.month(F.col("pickup_datetime")))
df_final=df_final.withColumn('day_of_week',F.dayofweek(F.col("pickup_datetime")))
df_final=df_final.withColumn('hour_of_day',F.hour(F.col("pickup_datetime")))

In [None]:
#Show columns with number of null records
NC = []
for column in df_final.columns:
    NC = df_final.filter(F.col(column).isNull()).count()
    NC.append((column, NC))
count_df = spark.createDataFrame(NC,["Column", "Null_Count"])

+--------------------+----------+
|              Column|Null_Count|
+--------------------+----------+
|        DOLocationID|         0|
|        PULocationID|         0|
|            VendorID|         0|
|     pickup_datetime|         0|
|    dropoff_datetime|         0|
|  store_and_fwd_flag|   5611640|
|          RatecodeID|   5611640|
|     passenger_count|   5611640|
|         fare_amount|         0|
|               extra|         0|
|             mta_tax|         0|
|          tip_amount|         0|
|        tolls_amount|         0|
|improvement_surch...|         4|
|        total_amount|         0|
|        payment_type|   1745545|
|congestion_surcharge| 537083283|
|            fileyear|         0|
|         distance_km|         0|
|               speed|         0|
+--------------------+----------+
only showing top 20 rows



In [None]:
display(count_df)

Column,Null_Count
DOLocationID,0
PULocationID,0
VendorID,0
pickup_datetime,0
dropoff_datetime,0
store_and_fwd_flag,5611640
RatecodeID,5611640
passenger_count,5611640
fare_amount,0
extra,0


In [None]:
#Drop null columns and columns not using for modeing
columns_drop=["VendorID","DOLocationID","PULocationID","pickup_datetime","dropoff_datetime","store_and_fwd_flag","passenger_count","fare_amount","payment_type","congestion_surcharge","ehail_fee","trip_type","airport_fee","PUZone","PUservice_zone","DOZone","DOservice_zone"]
df_final = df_final.drop(*columns_drop)

Get data from Part 2

In [None]:
df_p2q3c = df_final.groupBy('colour','PUBorough','DOBorough','month_of_year','day_of_week','hour_of_day').agg(F.avg('total_amount').alias('Average_amount_paid_per_trip'))
df_p2q3c.show(5)

+------+---------+---------+-------------+-----------+-----------+----------------------------+
|colour|PUBorough|DOBorough|month_of_year|day_of_week|hour_of_day|Average_amount_paid_per_trip|
+------+---------+---------+-------------+-----------+-----------+----------------------------+
| green|Manhattan|Manhattan|            5|          6|         22|           12.00181321270344|
| green| Brooklyn| Brooklyn|            5|          7|         12|          13.075475733479434|
| green|   Queens|Manhattan|            5|          1|         10|            26.0941881100267|
| green|    Bronx|    Bronx|            5|          2|         22|           11.96380361173816|
| green|Manhattan|    Bronx|            5|          1|          6|          15.531378205128188|
+------+---------+---------+-------------+-----------+-----------+----------------------------+
only showing top 5 rows



### Data Processing

In [None]:
#Drop all null records
df_final=df_final.na.drop()
df_final.count()

Out[16]: 704553067

Join Part 2 dataset and combined dataset(10/11/12-2022 only)

In [None]:
Q1_dataset=df_final.filter(F.col("month_of_year").isin([10, 11, 12])&((F.col("fileyear") == '2022')))
Q1_dataset.show(5)

+----------+-----+-------+----------+------------+---------------------+------------+--------+------------------+------------------+------+---------+---------+-------------+-----------+-----------+
|RatecodeID|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|fileyear|       distance_km|             speed|colour|PUBorough|DOBorough|month_of_year|day_of_week|hour_of_day|
+----------+-----+-------+----------+------------+---------------------+------------+--------+------------------+------------------+------+---------+---------+-------------+-----------+-----------+
|       1.0|  0.0|    0.5|       0.0|         0.0|                  0.3|         5.8|    2022|0.6276441600000001|7.8184047612456755| green|Manhattan|Manhattan|           12|          5|         10|
|       1.0|  0.0|    0.5|      5.45|         0.0|                  0.3|       27.25|    2022| 7.612197120000001|16.568264590084645| green| Brooklyn| Brooklyn|           12|          5|         11|
|       1.

In [None]:
#Join octember/november/december 2022 dataset vs dataset from part 2
Q1_dataset = Q1_dataset.join(df_p2q3c,['colour','PUBorough','DOBorough','month_of_year','day_of_week','hour_of_day'], how="left")

In [None]:
#Saving part 3 file for baseline
Q1_dataset.write.parquet("/FileStore/p3q1_dataset.parquet")

In [None]:
#Drop locations columns they are unused
columns_drop=["PUBorough","DOBorough"]
df_final = df_final.drop(*columns_drop)

RateCodeID from 1 to 6, but it does not meaning for higher rank of final rate. Therefore, it will be convert back to category and then used for one hot encoding.

In [None]:
df_final=df_final.withColumn("RatecodeID", F.col("RatecodeID").cast("string"))

Create a variable called cat_cols that contains the following values: 'RatecodeID','colour'

In [None]:
cat_cols = ['RatecodeID','colour']

Create pipeline

In [None]:
#Import OneHotEncoder, StringIndexer, VectorAssembler from pyspark.ml.feature
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

In [None]:
#Create an empty list called stages
stages = []

In [None]:
#Iterate through cat_cols and instantiate StringIndexer and OneHotEncoder for each column and them to stages
for cat_col in cat_cols:
    col_indexer = StringIndexer(inputCol=cat_col, outputCol=f"{cat_col}_ind")
    col_encoder = OneHotEncoder(inputCols=[f"{cat_col}_ind"], outputCols=[f"{cat_col}_ohe"])
    stages += [col_indexer, col_encoder]

In [None]:
#Create a variable called num_cols that contains the following values:
num_cols = [ 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'distance_km',
 'speed',
 'month_of_year',
 'day_of_week',
 'hour_of_day']

In [None]:
#Create a new list called cat_cols_ohe that will add the suffix _ohe to each element of cat_cols
cat_cols_ohe = [f"{cat_col}_ohe" for cat_col in cat_cols]

In [None]:
#Instantiate a VectorAssembler with inputCols=cat_cols_ohe + num_cols, outputCol="features". Save it into a variable called assembler
assembler = VectorAssembler(inputCols=cat_cols_ohe + num_cols, outputCol="features")

In [None]:
#Add assembler to stages
stages += [assembler]

In [None]:
#Import Pipeline from pyspark.ml
from pyspark.ml import Pipeline

In [None]:
#Instantiate a Pipeline with stages. Save it into a variable called pipeline
pipeline = Pipeline(stages=stages)

In [None]:
#Fit the pipeline with df_final
pipeline_model = pipeline.fit(df_final)

In [None]:
#Apply the pipeline to df_final
df_final = pipeline_model.transform(df_final)

In [None]:
df_final.show(5)

+----------+-----+-------+----------+------------+---------------------+------------+--------+------------------+------------------+------+-------------+-----------+-----------+--------------+--------------+----------+----------+--------------------+
|RatecodeID|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|fileyear|       distance_km|             speed|colour|month_of_year|day_of_week|hour_of_day|RatecodeID_ind|RatecodeID_ohe|colour_ind|colour_ohe|            features|
+----------+-----+-------+----------+------------+---------------------+------------+--------+------------------+------------------+------+-------------+-----------+-----------+--------------+--------------+----------+----------+--------------------+
|       1.0|  0.5|    0.5|       0.0|         0.0|                  0.3|         8.3|    2015|2.5749504000000005| 23.52746558375635| green|            5|          6|          0|           0.0| (6,[0],[1.0])|       1.0| (1,[],[])|(18,[0,7,8,11,12,.

In [None]:
df_final = df_final.select(['features','month_of_year','fileyear','total_amount'])

Extract testing dataset in October/NovemberDecember 2022

In [None]:
df_final.filter(F.col("month_of_year").isin([10, 11, 12])&(F.col("fileyear") == '2022')).drop(*["month_of_year","fileyear"]).write.parquet("/FileStore/testing_dataset.parquet")

Split dataset into trainning and validating set which excepts October/NovemberDecember 2022

In [None]:
#Split training_dataset into training and validating sets with a 80-20 ratio. Set the seed to 8
p3q2_dataset=df_final.filter(~(F.col("month_of_year").isin([10, 11, 12])&(F.col("fileyear") == '2022'))).drop(*["month_of_year","fileyear"]).write.parquet("/FileStore/p3q2_dataset.parquet")

In [None]:
#Split df_cleaned into training and testing sets with a 80-20 ratio. Set the seed to 8
p3q2_dataset=spark.read.parquet("/FileStore/p3q2_dataset.parquet")
p3q2_dataset.cache()
train_data, valid_data = p3q2_dataset.randomSplit([0.8, 0.2], seed=8)

In [None]:
#Saving training and validating datasets
valid_data.write.parquet("/FileStore/valid_data.parquet")
train_data.write.parquet("/FileStore/train_data.parquet")

In [None]:
#Loading training, validating and testing datasets
testing_dataset=spark.read.parquet("/FileStore/testing_dataset.parquet")
validating_dataset=spark.read.parquet("/FileStore/valid_data.parquet")
training_dataset=spark.read.parquet("/FileStore/train_data.parquet")

In [None]:
#Remove ouliers that greater than 300 dollars in validating dataset.
validating_dataset.filter(F.col("total_amount")<=300).write.parquet("/FileStore/valid_data_cleaned.parquet",mode='overwrite')

In [None]:
#Remove ouliers that greater than 300 dollars in training dataset.
training_dataset.filter(F.col("total_amount")<=300).write.parquet("/FileStore/train_data_cleaned.parquet",mode='overwrite')

### Baseline model-Calculate its RMSE.

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
p3q1_dataset=spark.read.parquet("/FileStore/p3q1_dataset.parquet")

In [None]:
RE= RegressionEvaluator(labelCol="total_amount", predictionCol="Average_amount_paid_per_trip", metricName="rmse")
rmse=RE.evaluate(p3q1_dataset)
print(rmse)

10.954374424777152


### Models

Sampling training and validating data

In [None]:
#Loading training and validating dataset that have been removed outliers
validating_dataset=spark.read.parquet("/FileStore/valid_data_cleaned.parquet")


In [None]:
training_dataset=spark.read.parquet("/FileStore/train_data_cleaned.parquet")

In [None]:
training_dataset.sample(fraction=0.1, seed=8).write.parquet("/FileStore/train_data_sampling.parquet")

In [None]:
training_dataset_sample=spark.read.parquet("/FileStore/train_data_sampling.parquet")

In [None]:
validating_dataset_sample=validating_dataset.sample(fraction=0.1, seed=8)

#### Decision Tree

In [None]:
from pyspark.ml.regression import DecisionTreeRegressor

In [None]:
#Instantiate a DecisionTree and provide the feature and target columns. Save it into a variable called Xg
DT = DecisionTreeRegressor(featuresCol='features', labelCol='total_amount')

In [None]:
DT_model = DT.fit(training_dataset_sample)

In [None]:
#Use the trained DecisionTree to make prediction on the training and validating set.
train_preds = DT_model.transform(training_dataset_sample)
valid_preds = DT_model.transform(validating_dataset_sample)

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
evaluator = RegressionEvaluator(labelCol='total_amount', predictionCol="prediction",metricName='rmse')

In [None]:
#Print the RMSE scores for the predictions on the training and validatin sets 
print(f"[RMSE] train:{evaluator.evaluate(train_preds)} - validate: {evaluator.evaluate(valid_preds)}")

[RMSE] train:2.550293410128402 - validate: 2.549486052314351


#### Random Forest

In [None]:
#Import RandomForestRegressor from pyspark.ml.classification
from pyspark.ml.regression import RandomForestRegressor

In [None]:
#Instantiate a RandomForest and provide the feature and target columns. Save it into a variable called rf
rf = RandomForestRegressor(featuresCol='features', labelCol='total_amount')

In [None]:
#Fit the RandomForest with the training set
rf_model = rf.fit(training_dataset_sample)

In [None]:
#Use the trained RandomForest to make prediction on the training set. Save the results in a variable called train_preds
train_preds = rf_model.transform(training_dataset_sample)

In [None]:
valid_preds = rf_model.transform(validating_dataset_sample)

In [None]:
#Print the RMSE scores for the predictions on the training and validating sets
print(f"[RMSE] train:{evaluator.evaluate(train_preds)} - validate: {evaluator.evaluate(valid_preds)}")

[RMSE] train:2.4167265280028802 - validate: 2.4195401479143057


### Access the best model

In [None]:
testing_dataset=spark.read.parquet("/FileStore/testing_dataset.parquet")
test_preds = rf_model.transform(testing_dataset)

In [None]:
print(f"[RMSE] test of random forest model: {evaluator.evaluate(test_preds)}")

[RMSE] test of random forest model: 4.744471629624487
