## Analysis of New York City Taxi Trips

### PART 1: Data Ingestion and Preparation

#### 1. Download the dataset for yellow and green taxi cabs from Jan 2019 to Apr 2022 and load it into an Azure Blob Storage.

#### 2. Read the files from the Azure storage and make a copy of it into DBFS.

In [None]:
# Create 3 variables:
storage_account_name = "xxx"
storage_account_access_key = "xxx"
blob_container_name = "xxx"

In [None]:
#Mount the blob container into the Databricks Files System:
dbutils.fs.mount(
  source = f'wasbs://{blob_container_name}@{storage_account_name}.blob.core.windows.net',
  mount_point = f'/mnt/{blob_container_name}/',
  extra_configs = {'fs.azure.account.key.' + storage_account_name + '.blob.core.windows.net': storage_account_access_key}
)

In [None]:
# Read the parquet files for yellow taxi where airport_fee has data type of double and make a copy of it into DBFS
df = spark.read.parquet("/mnt/bde-assignment-2/yellow/double")
df.write.parquet('/dbfs/yellow', mode='append')

In [None]:
# Read the parquet files for yellow taxi where airport_fee has data type of integer, convert the data type to double and make a copy of it into DBFS
df = spark.read.parquet("/mnt/bde-assignment-2/yellow/integer")
df = df.withColumn("airport_fee",df.airport_fee.cast('double'))
df.write.parquet('/dbfs/yellow', mode='append')

In [None]:
# Read all the parquet files for green taxi and make a copy of it into DBFS
df = spark.read.parquet("/mnt/bde-assignment-2/green_tripdata_*.parquet")
df.cache()
df.write.parquet('/dbfs/green')

#### 3. Count the total numbers of rows for each taxi colour (yellow and green) by reading the files stored on DBFS.

In [None]:
# Read the yellow taxi file stored on DBFS
yellow=spark.read.parquet("/dbfs/yellow")
yellow.cache()

Out[18]: DataFrame[VendorID: bigint, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: double, trip_distance: double, RatecodeID: double, store_and_fwd_flag: string, PULocationID: bigint, DOLocationID: bigint, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, airport_fee: double]

In [None]:
# Count the total numbers of rows for yellow taxi
yellow.count()

Out[5]: 152823008

In [None]:
# Read the green taxi file stored on DBFS
green=spark.read.parquet("/dbfs/green")
green.cache()

Out[19]: DataFrame[VendorID: bigint, lpep_pickup_datetime: timestamp, lpep_dropoff_datetime: timestamp, store_and_fwd_flag: string, RatecodeID: double, PULocationID: bigint, DOLocationID: bigint, passenger_count: double, trip_distance: double, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, ehail_fee: double, improvement_surcharge: double, total_amount: double, payment_type: double, trip_type: double, congestion_surcharge: double]

In [None]:
# Count the total numbers of rows for green taxi
green.count()

Out[8]: 9390483

#### 4. Convert the “Yellow” Apr 2022 parquet into a csv file and send it to your Azure Blob Storage.

In [None]:
# Convert parquet file to csv file (credit: https://learn.microsoft.com/en-us/answers/questions/64449/how-to-move-compressed-parquet-file-using-adf-or-d.html)
df=spark.read.parquet("/mnt/bde-assignment-2/yellow/double/yellow_tripdata_2022-04.parquet")
df.write.csv("/mnt/bde-assignment-2/yellow_tripdata_2022-04.csv")

#### 5. Explore the dataset and perform any required data cleaning to remove unrealistic trips.

In [None]:
# Import pyspark.sql.functions as F and import round() function from pyspark.sql.functions
import pyspark.sql.functions as F
from pyspark.sql.functions import round

**[5.1]** Remove unrealistic trips for yellow taxi

In [None]:
# Remove trips which finish before the starting time
yellow = yellow.filter(yellow['tpep_pickup_datetime'] <= yellow['tpep_dropoff_datetime'])

In [None]:
# Add new column 'trip_duration' (seconds) by calculating the gap between dropoff time and pickup time
# Add new column 'speed'(mile per hour) where speed is equal to trip distance divided by trip duration
yellow = yellow.withColumn("trip_duration", F.col("tpep_dropoff_datetime").cast("long")-F.col("tpep_pickup_datetime").cast("long")).\
    withColumn("speed",round(3600*F.col("trip_distance")/F.col("trip_duration"),1))

In [None]:
# Remove trips with negative speed
yellow=yellow.filter(F.col("speed")>=0)

# Remove trips with high speed within and outside NYC (speed limit inside NYC is 30mph and outside NYC is 65mph)
yellow=yellow.filter(F.col("speed")<=65)

# Remove trips with high speed within NYC (speed limit inside NYC is 30mph). Assume the when both PULocationID and DOLocationID are NYC location code (2-263) then trip is within NYC. Location code 1 represents Newark Airport (EWR).
yellow=yellow.filter( ((F.col("PULocationID").between(2,263))&\
                     (F.col("DOLocationID").between(2,263))&\
                     (F.col("speed")<=30))|\
                     (F.col("PULocationID")==1)|\
                     (F.col("DOLocationID")==1 ))
                                          
# Remove trips that are travelling too short or too long (duration wise). Assume the trip should be no less than 2 mins and no more than 3 hours
yellow=yellow.filter( (F.col("trip_duration")<=3*3600) & (F.col("trip_duration")>=2*60) )

# Remove trips that are travelling too short or too long (distance wise). Assume the trip should be no shorter than 0.2 mile (321m) and no longer than 195miles (3hrs drive at the speed of 65mph)
yellow=yellow.filter( (F.col("trip_distance")>=0.2) & (F.col("trip_distance")<=195) )

# Remove the trips that have more than 6 passengers (passenger_count field can be null)
# Note: A Driver must not permit more than four Passengers to ride in a four-Passenger Vehicle, nor more than five Passengers in a fivePassenger Vehicle, except that an additional Passenger must be accepted if the Passenger is under the age of seven (7) and is held on the lap of an adult Passenger seated in the rear.) from  https://www1.nyc.gov/assets/tlc/downloads/pdf/rule_book_current_chapter_54.pdf
yellow=yellow.filter((F.col("passenger_count")<=6)|(F.col("passenger_count").isNull())) 

# Remove the trips that have negative total amount when the payment_type is 1(credit card) or 2(cash)
yellow=yellow.filter(((F.col("total_amount")>0) &\
                   (F.col("payment_type").isin([1,2]))) |\
                    (F.col("payment_type").isin([3,4,5,6]))|\
                   (F.col("payment_type").isNull())    
                  )

# Remove the records where pickup time is earlier than 2019-01-01 and later than 2022-05-01
yellow=yellow.filter((F.col('tpep_pickup_datetime')>='2019-01-01') & (F.col('tpep_pickup_datetime')<'2022-05-01'))   

# Remove the records with extreme high value(>$300) of total amount
yellow=yellow.filter(F.col("total_amount")<=300)

# Remove the duplicate records
yellow=yellow.dropDuplicates() 

In [None]:
# Count total rows of cleaned yellow taxi data (140730707 rows - around 92% of original total rows)
yellow.count()

Out[11]: 140730707

**[5.2]** Remove unrealistic trips for green taxi

In [None]:
# Remove trips which finish before the starting time
green = green.filter(F.col("lpep_pickup_datetime") <= F.col("lpep_dropoff_datetime"))

In [None]:
# Add new column 'trip_duration' (seconds) by calculating the gap between dropoff time and pickup time
# Add new column 'speed'(mile per hour) where speed is equal to trip distance divided by trip duration
green = green.withColumn("trip_duration", F.col("lpep_dropoff_datetime").cast("long")-F.col("lpep_pickup_datetime").cast("long")).\
    withColumn("speed",round(3600*F.col("trip_distance")/F.col("trip_duration"),1))

In [None]:
# Remove trips with negative speed
green=green.filter(F.col("speed")>=0)

# Remove trips with high speed within and outside NYC (speed limit inside NYC is 30mph and outside NYC is 65mph)
green=green.filter(F.col("speed")<=65)

# Remove trips with high speed within NYC (speed limit inside NYC is 30mph). Assume the when both PULocationID and DOLocationID are NYC location code (2-263) then trip is within NYC. Location code 1 represents Newark Airport (EWR).
green=green.filter( ((F.col("PULocationID").between(2,263))&\
                     (F.col("DOLocationID").between(2,263))&\
                     (F.col("speed")<=30))|\
                     (F.col("PULocationID")==1)|\
                     (F.col("DOLocationID")==1 ))

# Remove trips that are travelling too short or too long (duration wise). Assume the trip should be no less than 2 mins and no more than 3 hours
green=green.filter( (F.col("trip_duration")<=3*3600) & (F.col("trip_duration")>=2*60) )

# Remove trips that are travelling too short or too long (distance wise). Assume the trip should be no shorter than 0.2 mile (321m) and no longer than 195miles (3hrs drive at the speed of 65mph)
green=green.filter( (F.col("trip_distance")>=0.2) & (F.col("trip_distance")<=195) )

# Remove the trips that have more than 6 passengers (passenger_count field can be null)
# Note: A Driver must not permit more than four Passengers to ride in a four-Passenger Vehicle, nor more than five Passengers in a fivePassenger Vehicle, except that an additional Passenger must be accepted if the Passenger is under the age of seven (7) and is held on the lap of an adult Passenger seated in the rear.) from  https://www1.nyc.gov/assets/tlc/downloads/pdf/rule_book_current_chapter_54.pdf
green=green.filter((F.col("passenger_count")<=6)|(F.col("passenger_count").isNull())) 

# Remove the trips that have negative total amount when the payment_type is 1(credit card) or 2(cash)
green=green.filter(((F.col("total_amount")>0) &\
                   (F.col("payment_type").isin([1,2]))) |\
                    (F.col("payment_type").isin([3,4,5,6]))|\
                   (F.col("payment_type").isNull())    
                  )

# Remove the records where pickup time is earlier than 2019-01-01 and later than 2022-05-01
green=green.filter((F.col('lpep_pickup_datetime')>='2019-01-01') & (F.col('lpep_pickup_datetime')<'2022-05-01'))  

# Remove the records with extreme high value(>$300) of total amount 
green=green.filter(F.col("total_amount")<=300)

# Remove the duplicate records
green=green.dropDuplicates()  

In [None]:
# Count total rows of cleaned green taxi data (8669048rows - around 92% of original total rows)
green.count()

Out[10]: 8669048

#### 6. Combine the yellow and green taxi dataset together

In [None]:
# Rename the pickup and dropoff datetime columns for both green taxi and yellow taxi.
green=green.withColumnRenamed("lpep_pickup_datetime","pickup_datetime").withColumnRenamed("lpep_dropoff_datetime","dropoff_datetime")
yellow=yellow.withColumnRenamed("tpep_pickup_datetime","pickup_datetime").withColumnRenamed("tpep_dropoff_datetime","dropoff_datetime")

In [None]:
# Import lit() function from pyspark.sql.functions
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType,StringType

In [None]:
# Add the airport_fee column in green taxi dataset
green=green.withColumn("airport_fee",lit(None)).\
     withColumn("color",lit('green')).\
     withColumn("airport_fee",F.col("airport_fee").cast(DoubleType())).\
     withColumn("color",F.col("color").cast(StringType()))

In [None]:
# Add the ehail_fee and trip_type columns in yellow taxi dataset
yellow=yellow.withColumn("ehail_fee",lit(None)).\
    withColumn("trip_type",lit(None)).\
    withColumn("color",lit('yellow')).\
    withColumn("ehail_fee",F.col("ehail_fee").cast(DoubleType())).\
    withColumn("trip_type",F.col("trip_type").cast(DoubleType())).\
    withColumn("color",F.col("color").cast(StringType()))

In [None]:
# Combine the yellow and green taxi dataset together
yellow_and_green=yellow.unionByName(green)

#### 7. Export the combined data into a parquet file in DBFS and then load it as a table or view

In [None]:
# Export the combined data into the parquet file 'yellow_and_green'
yellow_and_green.write.parquet('/dbfs/yellowandgreen')

In [None]:
# Read the data from parquet file
data=spark.read.parquet("/dbfs/yellowandgreen")
data.cache()

Out[33]: DataFrame[VendorID: bigint, pickup_datetime: timestamp, dropoff_datetime: timestamp, passenger_count: double, trip_distance: double, RatecodeID: double, store_and_fwd_flag: string, PULocationID: bigint, DOLocationID: bigint, payment_type: double, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, airport_fee: double, trip_duration: bigint, speed: double, ehail_fee: double, trip_type: double, color: string]

In [None]:
# Create a view called "taxi"
data.createOrReplaceTempView("taxi")

### PART 2: Business Questions

#### 1. By Year and Month

In [None]:
part2_q1=spark.sql(
'''
With summary_dayofweek As (
Select
cast(date_trunc('MM',pickup_datetime) As Date) year_month,
Case when weekday(pickup_datetime)=0 then 'Monday'
    when weekday(pickup_datetime)=1 then 'Tuesday'
    when weekday(pickup_datetime)=2 then 'Wednesday'
    when weekday(pickup_datetime)=3 then 'Thursday'
    when weekday(pickup_datetime)=4 then 'Friday'
    when weekday(pickup_datetime)=5 then 'Saturday'
    when weekday(pickup_datetime)=6 then 'Sunday'
End as dayofweek,
count (*) numberoftrips_day
From  taxi
Group by 1,2
)
, rank_day As (
Select *
From (
Select *,
row_number() over (partition by year_month order by numberoftrips_day desc) rk_day
From summary_dayofweek
    )
Where rk_day=1
)
, summary_hour As (
Select
cast(date_trunc('MM',pickup_datetime) As Date) year_month,
hour(pickup_datetime) hourofday,
count (*) numberoftrips_hour
From taxi
Group by 1,2
)
, rank_hour As (
Select *
From (
Select *,
row_number() over (partition by year_month order by numberoftrips_hour desc) rk_hour
From summary_hour
    )
Where rk_hour=1
)
, trip_passenger As (
Select
cast(date_trunc('MM',pickup_datetime) As Date) year_month,
Count(*) numberoftrips,
avg(passenger_count)::decimal(10,2) average_passengers,
avg(total_amount)::decimal(10,2) average_amount_paid_per_trip,
(sum(total_amount)/sum(passenger_count))::decimal(10,2) average_amount_paid_per_passenger
From taxi
Group by 1
)

Select 
tp.year_month,
numberoftrips,
dayofweek,
hourofday,
average_passengers,
average_amount_paid_per_trip,
average_amount_paid_per_passenger
From trip_passenger tp
Join rank_day rd
On tp.year_month=rd.year_month
Join rank_hour rh
On tp.year_month=rh.year_month 
Order by 1
'''
)

display(part2_q1)

year_month,numberoftrips,dayofweek,hourofday,average_passengers,average_amount_paid_per_trip,average_amount_paid_per_passenger
2019-01-01,7739553,Thursday,18,1.55,14.86,9.62
2019-02-01,7202004,Friday,18,1.55,17.82,11.52
2019-03-01,8031031,Friday,18,1.56,18.34,11.82
2019-04-01,7563635,Tuesday,18,1.56,18.45,11.92
2019-05-01,7682470,Thursday,18,1.55,18.81,12.16
2019-06-01,7039764,Saturday,18,1.55,18.89,12.21
2019-07-01,6347126,Wednesday,18,1.56,18.54,11.97
2019-08-01,6092175,Thursday,18,1.56,18.56,11.99
2019-09-01,6583009,Thursday,18,1.54,18.95,12.41
2019-10-01,7231464,Thursday,18,1.53,18.85,12.46


#### 2. By Taxi Color

In [None]:
# The source for calculation of median is from https://gist.github.com/daniarleagk/6df2f695f2397fa38a9cf70f9c829d0e
part2_q2=spark.sql(
'''
With med_duration As
(  
SELECT color, (avg(trip_duration)/60)::decimal(10,2) as median_duration
FROM ( SELECT color, trip_duration, rN, (CASE WHEN cN % 2 = 0 then (cN DIV 2) ELSE (cN DIV 2) + 1 end) as m1, (cN DIV 2) + 1 as m2 
        FROM ( 
            SELECT color, trip_duration, row_number() OVER (PARTITION BY color ORDER BY trip_duration ) as rN, count(trip_duration) OVER (PARTITION BY color ) as cN
            FROM taxi
         ) s
    ) r
WHERE rN BETWEEN m1 and m2
GROUP BY color     
)

, med_distance As
(
SELECT color, (avg(trip_distance)*1.60934)::decimal(10,2) as median_distance
FROM ( SELECT color, trip_distance, rN, (CASE WHEN cN % 2 = 0 then (cN DIV 2) ELSE (cN DIV 2) + 1 end) as m1, (cN DIV 2) + 1 as m2 
        FROM ( 
            SELECT color, trip_distance, row_number() OVER (PARTITION BY color ORDER BY trip_distance ) as rN, count(trip_distance) OVER (PARTITION BY color ) as cN
            FROM taxi
         ) s
    ) r
WHERE rN BETWEEN m1 and m2
GROUP BY color        
)

, med_speed As
(
SELECT color, (avg(speed)*1.60934)::decimal(10,2) as median_speed
FROM ( SELECT color, speed, rN, (CASE WHEN cN % 2 = 0 then (cN DIV 2) ELSE (cN DIV 2) + 1 end) as m1, (cN DIV 2) + 1 as m2 
        FROM ( 
            SELECT color, speed, row_number() OVER (PARTITION BY color ORDER BY speed ) as rN, count(speed) OVER (PARTITION BY color ) as cN
            FROM taxi
         ) s
    ) r
WHERE rN BETWEEN m1 and m2
GROUP BY color        
)
, avg_min_max As (
Select
color,
(avg(trip_duration)/60)::decimal(10,2) avgerage_duration,
(min(trip_duration)/60)::decimal(10,2) min_duration,
(max(trip_duration)/60)::decimal(10,2) max_duration,
(avg(trip_distance)*1.60934)::decimal(10,2) average_distance, --convert from mile to km
(min(trip_distance)*1.60934)::decimal(10,2) min_distance,
(max(trip_distance)*1.60934)::decimal(10,2) max_distance,
(avg(speed)*1.60934)::decimal(10,2) average_speed,  --convert from mile per hour to km per hour
(min(speed)*1.60934)::decimal(10,2) min_speed,
(max(speed)*1.60934)::decimal(10,2) max_speed
From taxi
Group by 1
)

Select
amm.color,
avgerage_duration,
median_duration,
min_duration,
max_duration,
average_distance,
median_distance
min_distance,
max_distance,
average_speed,
median_speed,
min_speed,
max_speed

From avg_min_max amm
Join med_duration mdu
On amm.color = mdu.color
Join med_distance mdi
On amm.color=mdi.color
Join med_speed msp
On amm.color=msp.color

'''
)
display(part2_q2)

color,avgerage_duration,median_duration,min_duration,max_duration,average_distance,min_distance,max_distance,average_speed,median_speed,min_speed,max_speed
green,16.85,12.58,2.0,179.87,5.89,3.54,134.38,19.63,17.7,0.16,90.28
yellow,13.98,10.97,2.0,180.0,4.46,2.72,191.03,17.88,16.25,0.16,104.61


#### 3. Percentage of trips where drivers received tips

In [None]:
part2_q3=spark.sql(
'''
Select
(100*count(*)/(Select count(*) From taxi))::decimal(10,2) percentage_with_tips
From taxi
Where tip_amount>0
'''
)
display(part2_q3)

percentage_with_tips
69.82


#### 4. Percentage where the driver received tips of at least $10

In [None]:
part2_q4=spark.sql(
'''
Select
(100*count(*)/(Select count(*) From taxi Where tip_amount>0))::decimal(10,2) percentage_with_tips_over10dollars
From taxi
Where tip_amount>=10
'''
)
display(part2_q4)

percentage_with_tips_over10dollars
2.63


#### 5. Classify each trip into bins of durations

In [None]:
part2_q5=spark.sql(
'''
Select
Case when trip_duration <5*60 then'a. Under 5 mins'
    when trip_duration >=5*60 and trip_duration <10*60 then'b. From 5 mins to 10 mins'
    when trip_duration >=10*60 and trip_duration <20*60 then'c. From 10 mins to 20 mins'
    when trip_duration >=20*60 and trip_duration <30*60 then'd. From 20 mins to 30 mins'
    when trip_duration >=30*60 and trip_duration <60*60 then'e. From 30 mins to 60 mins'
    when trip_duration >=60*60 then'f. At least 60 mins'
    Else trip_duration
End as trip_duration_bins,
(avg(speed)*1.60934)::decimal(10,2) average_speed,
((sum(trip_distance)/sum(total_amount))*1.60934)::decimal(10,2) average_distance_per_dollar
From taxi
Group by 1
Order by 1

'''
)
display(part2_q5)

trip_duration_bins,average_speed,average_distance_per_dollar
a. Under 5 mins,18.98,0.13
b. From 5 mins to 10 mins,16.72,0.17
c. From 10 mins to 20 mins,16.87,0.23
d. From 20 mins to 30 mins,19.39,0.29
e. From 30 mins to 60 mins,24.51,0.36
f. At least 60 mins,22.86,0.4


#### 6. Choose a duration bin to maximise a driver's income (rerun)

In [None]:
part2_q6=spark.sql(
'''
Select
Case when trip_duration <5*60 then'a. Under 5 mins'
    when trip_duration >=5*60 and trip_duration <10*60 then'b. From 5 mins to 10 mins'
    when trip_duration >=10*60 and trip_duration <20*60 then'c. From 10 mins to 20 mins'
    when trip_duration >=20*60 and trip_duration <30*60 then'd. From 20 mins to 30 mins'
    when trip_duration >=30*60 and trip_duration <60*60 then'e. From 30 mins to 60 mins'
    when trip_duration >=60*60 then'f. At least 60 mins'
    Else trip_duration
End as trip_duration_bins,
(sum(total_amount)/(sum(trip_duration)/3600))::decimal(10,2) average_dollar_per_hour
From taxi
Group by 1
Order by 1

'''
)
display(part2_q6)

trip_duration_bins,average_dollar_per_hour
a. Under 5 mins,145.21
b. From 5 mins to 10 mins,95.49
c. From 10 mins to 20 mins,73.16
d. From 20 mins to 30 mins,67.45
e. From 30 mins to 60 mins,69.91
f. At least 60 mins,55.88


### PART 3: Machine Learning

#### 1. Feature Engineering

##### 1.1 Data Quality Check and Imputation

In [None]:
# Import pyspark.sql.functions as F
import pyspark.sql.functions as F

In [None]:
# Find Count of Null of all columns in dataframe
display(data.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in data.columns]))

VendorID,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,trip_duration,speed,ehail_fee,trip_type,color
0,0,0,1506715,0,1506715,1506715,0,0,1506715,0,0,0,0,0,1,0,6515722,114252815,0,0,149397212,142239033,0


In [None]:
# Count the number of records for each value in 'passenger_count' field
display(data.groupBy('passenger_count').count().sort(F.col('count').desc()))

passenger_count,count
1.0,106860892
2.0,21636426
3.0,5858865
5.0,5039976
6.0,3084806
0.0,2770730
4.0,2641345
,1506715


In [None]:
# Count the number of records for each value in 'RatecodeID' field
display(data.groupBy('RatecodeID').count().sort(F.col('count').desc()))

RatecodeID,count
1.0,144736046
2.0,2437126
,1506715
5.0,406573
3.0,259258
99.0,44286
4.0,9621
6.0,130


In [None]:
# Count the number of records for each value in 'payment_type' field
display(data.groupBy('payment_type').count().sort(F.col('count').desc()))

payment_type,count
1.0,108724417
2.0,38305386
,1506715
3.0,534211
4.0,328845
5.0,181


In [None]:
# Count the number of records for each value in 'trip_type' field
display(data.groupBy('trip_type').count().sort(F.col('count').desc()))

trip_type,count
,142239484
1.0,6939199
2.0,221072


In [None]:
# Replace the null values with 0 for the variables: 'airport_fee','ehail_fee','congestion_surcharge','improvement_surcharge'
data=data.na.fill(value=0, subset=['airport_fee','ehail_fee','congestion_surcharge','improvement_surcharge'])
data=data.na.fill(value=1, subset=['passenger_count','RatecodeID','payment_type','trip_type'])

##### 1.2 Feature Extraction

In [None]:
# Add new columns including year_month, dayofweek and hourofday and change data type of the variables RatecodeID, payment_type and tr1ip_type
# year_month-->credit:https://stackoverflow.com/questions/64132644/pyspark-keep-only-year-and-month-in-date

data=data.withColumn('year_month', F.date_format(F.col('pickup_datetime'),'yyyy-MM')).\
        withColumn('dayofweek', F.dayofweek(F.col('pickup_datetime'))).\
        withColumn('hourofday', F.hour(F.col('pickup_datetime')))

##### 1.3 Correlation Analysis

In [None]:
# Create the sub dataset for period from Jan 2022 to Mar 2022.
data_sub=data.filter(F.col("year_month").between ('2022-01','2022-03'))

In [None]:
# Convert data_sub to pandas dataframe df
import pandas as pd
df=data_sub.toPandas()

In [None]:
# Correlaion analysis
df.drop(['VendorID','store_and_fwd_flag','PULocationID','DOLocationID'],axis=1).corr()

Unnamed: 0,passenger_count,trip_distance,RatecodeID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,trip_duration,speed,ehail_fee,trip_type,dayofweek,hourofday
passenger_count,1.0,0.021675,-0.020922,0.008408,0.001329,-0.105778,0.009157,0.008734,0.018217,0.004368,0.001821,0.015302,0.013875,0.01685,0.016006,,-0.002832,0.011239,0.019579
trip_distance,0.021675,1.0,0.104931,-0.00172,0.066892,0.040766,-0.067741,0.545162,0.64519,0.007909,0.08427,-0.201054,0.620014,0.843463,0.636668,,0.007126,-0.019288,-0.01004
RatecodeID,-0.020922,0.104931,1.0,-0.026086,0.009823,-0.045268,-0.005209,-0.0393,0.073482,0.002832,0.008836,-0.19332,-0.001016,0.144564,0.017236,,0.013683,-0.003539,-0.030015
payment_type,0.008408,-0.00172,-0.026086,1.0,-0.000686,-0.026923,-0.257264,-0.455372,-0.016522,-0.318926,-0.010524,-0.217191,0.015765,-0.00017,-0.018459,,0.016289,-0.009035,-0.031154
fare_amount,0.001329,0.066892,0.009823,-0.000686,1.0,0.003241,-9.6e-05,0.038626,0.04427,0.009077,0.999647,-0.01113,0.039727,0.062912,0.036333,,0.001585,-0.000136,-0.000987
extra,-0.105778,0.040766,-0.045268,-0.026923,0.003241,1.0,0.07649,0.049731,0.039508,0.062839,0.006788,0.151782,0.037691,0.04343,-0.012974,,-0.016998,0.005474,0.126543
mta_tax,0.009157,-0.067741,-0.005209,-0.257264,-9.6e-05,0.07649,1.0,-0.017603,-0.123106,0.820306,0.001251,0.434098,0.019469,-0.037213,-0.055888,,-0.148618,0.001479,0.012325
tip_amount,0.008734,0.545162,-0.0393,-0.455372,0.038626,0.049731,-0.017603,1.0,0.433525,0.050286,0.06241,0.00173,0.343907,0.491678,0.314311,,-0.005971,0.00034,0.023669
tolls_amount,0.018217,0.64519,0.073482,-0.016522,0.04427,0.039508,-0.123106,0.433525,1.0,0.017487,0.063516,-0.103633,0.472785,0.473307,0.430299,,-0.00207,-0.015402,-0.008517
improvement_surcharge,0.004368,0.007909,0.002832,-0.318926,0.009077,0.062839,0.820306,0.050286,0.017487,1.0,0.012939,0.378469,0.022235,0.012529,0.001403,,-0.004243,0.000531,0.00421


#### 2. Modelling and Evaluation

##### 2.1 Split training and testing sets

In [None]:
# Split training and testing sets with a 80-20 ratio and set seed to 8. 
# train_data,test_data=data_sub.randomSplit([0.8, 0.2], seed=8)
data_fit=data[F.col('year_month')!='2022-04']
train_data,test_data=data_fit.randomSplit([0.8, 0.2], seed=8)
train_data.cache()
test_data.cache()

Out[6]: DataFrame[VendorID: bigint, pickup_datetime: timestamp, dropoff_datetime: timestamp, passenger_count: double, trip_distance: double, RatecodeID: double, store_and_fwd_flag: string, PULocationID: bigint, DOLocationID: bigint, payment_type: double, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, airport_fee: double, trip_duration: bigint, speed: double, ehail_fee: double, trip_type: double, color: string, year_month: string, dayofweek: int, hourofday: int]

##### 2.2 Multiple Linear Regression Model

In [None]:
# Import VectorAssembler, Pipeline, LinearRegression and RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
# Create a new list called num_cols
num_cols=['trip_duration','tolls_amount','tip_amount','airport_fee','speed']

In [None]:
# Instantiate a VectorAssembler and save it into a variable called assembler
assembler=VectorAssembler(inputCols=num_cols, outputCol='features')

In [None]:
# Instantiate a Linear Regression and provide the feature and target columns. Save it into a variable called 'lr'
lr=LinearRegression(featuresCol="features", labelCol="total_amount")

In [None]:
# Create pipeline
lr_stages=[assembler, lr]
lr_pipeline=Pipeline(stages=lr_stages)
# Fit the pipeline model
lr_pipelineModel=lr_pipeline.fit(train_data)

In [None]:
# Make predictions on training and testing datasets
pred_train=lr_pipelineModel.transform(train_data)
pred_test=lr_pipelineModel.transform(test_data)

In [None]:
# Take a look at the output for training dataset
display(pred_train.select("features","total_amount","prediction"))

features,total_amount,prediction
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(389.0, 13.0))",8.3,10.61626254849113
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(144.0, 10.0))",4.8,6.43416568151526
"Map(vectorType -> dense, length -> 5, values -> List(138.0, 0.0, 1.4, 0.0, 10.4))",6.2,8.239651191511818
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(690.0, 12.5))",11.8,13.799097461770575
"Map(vectorType -> dense, length -> 5, values -> List(464.0, 0.0, 1.75, 0.0, 10.1))",10.55,12.221346335936213
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(1108.0, 12.3))",17.8,18.450028622969143
"Map(vectorType -> dense, length -> 5, values -> List(526.0, 0.0, 1.75, 0.0, 10.3))",10.55,13.018475168103432
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(326.0, 11.0))",7.3,8.96699384412048
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(909.0, 15.8))",15.3,17.82622270364201
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(1083.0, 13.6))",17.3,18.773512537971577


In [None]:
# Take a look at the output for testing dataset
display(pred_test.select("features","total_amount","prediction"))

features,total_amount,prediction
"Map(vectorType -> dense, length -> 5, values -> List(748.0, 0.0, 2.7, 0.0, 8.2))",13.5,15.701857892690278
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(635.0, 10.8))",10.3,12.380761381825565
"Map(vectorType -> dense, length -> 5, values -> List(1090.0, 0.0, 3.7, 0.0, 8.9))",18.5,21.11539163845171
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(190.0, 11.4))",5.8,7.610219012048454
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(1418.0, 17.0))",24.3,24.163962819751813
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(488.0, 7.4))",8.8,9.124134597446927
"Map(vectorType -> dense, length -> 5, values -> List(735.0, 0.0, 3.15, 0.0, 21.1))",18.95,22.12213386738365
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(1620.0, 21.1))",31.8,28.371820268954764
"Map(vectorType -> dense, length -> 5, values -> List(1124.0, 0.0, 3.05, 0.0, 8.6))",18.35,20.578033077008094
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(215.0, 15.1))",6.3,9.622262172649492


In [None]:
# Create regression evaluator 'lr_evaluator'
lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="total_amount", metricName="rmse")

In [None]:
# Print the RMSE scores for the predictions on the training and testing sets using lr_evaluator.evaluate()
print(f"RMSE for training set is {lr_evaluator.evaluate(pred_train):0.4f}")
print(f"RMSE for testing set is {lr_evaluator.evaluate(pred_test):0.4f}")

RMSE for training set is 3.0517
RMSE for testing set is 3.0577


##### 2.3 Decision Tree

In [None]:
# Import DecisionTreeRegressor and RegressionEvaluator
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
# Instantiate a Decision Tree and provide the feature and target columns. Save it into a variable called 'dt'
dt=DecisionTreeRegressor(featuresCol='features',labelCol='total_amount')

In [None]:
# Create pipeline
stages=[assembler, dt]
pipeline=Pipeline(stages=stages)
# Fit the pipeline model
pipelineModel=pipeline.fit(train_data)

In [None]:
# Make predictions on training and testing datasets
pred_train=pipelineModel.transform(train_data)
pred_test=pipelineModel.transform(test_data)

In [None]:
# Take a look at the output for training dataset
display(pred_train.select("features","total_amount","prediction"))

features,total_amount,prediction
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(389.0, 13.0))",8.3,10.41421831478588
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(144.0, 10.0))",4.8,8.741408807161127
"Map(vectorType -> dense, length -> 5, values -> List(138.0, 0.0, 1.4, 0.0, 10.4))",6.2,8.741408807161127
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(690.0, 12.5))",11.8,13.605354206452745
"Map(vectorType -> dense, length -> 5, values -> List(464.0, 0.0, 1.75, 0.0, 10.1))",10.55,10.41421831478588
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(1108.0, 12.3))",17.8,17.818769232514885
"Map(vectorType -> dense, length -> 5, values -> List(526.0, 0.0, 1.75, 0.0, 10.3))",10.55,13.605354206452745
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(326.0, 11.0))",7.3,8.741408807161127
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(909.0, 15.8))",15.3,17.818769232514885
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(1083.0, 13.6))",17.3,17.818769232514885


In [None]:
# Create regression evaluator 'dt_evaluator'
dt_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="total_amount", metricName="rmse")

In [None]:
# Print the RMSE scores for the predictions on the training and testing sets using dt_evaluator.evaluate()
print(f"RMSE for training set is {dt_evaluator.evaluate(pred_train):0.4f}")
print(f"RMSE for testing set is {dt_evaluator.evaluate(pred_test):0.4f}")

RMSE for training set is 3.8250
RMSE for testing set is 3.8257


##### 2.4 Prediction in April 2022

In [None]:
# Extract the April 2022 trips
data_apr=data[F.col('year_month')=='2022-04']

In [None]:
# Make predictions on April 2022 trips
pred_apr=lr_pipelineModel.transform(data_apr)

In [None]:
# Take a look at the output for April 2022 trips
display(pred_apr.select("features","total_amount","prediction"))

features,total_amount,prediction
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(613.0, 8.2))",9.3,10.916584552815822
"Map(vectorType -> dense, length -> 5, values -> List(1019.0, 0.0, 3.11, 0.0, 7.5))",18.66,18.944743029420792
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(536.0, 8.9))",8.3,10.369598719464546
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(352.0, 9.2))",6.8,8.421307346543871
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(492.0, 10.2))",11.05,10.477430259729584
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(468.0, 8.7))",7.8,9.504369137380255
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(845.0, 17.5))",15.8,17.89389391023175
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(773.0, 6.5))",10.3,11.938525344899237
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(902.0, 10.9))",15.55,15.457955294647348
"Map(vectorType -> sparse, length -> 5, indices -> List(0, 4), values -> List(777.0, 12.5))",13.3,14.786558335568118


In [None]:
# Calculate the RMSE on April 2022 prediction
print(f"RMSE for testing set is {lr_evaluator.evaluate(pred_apr):0.4f}")

RMSE for testing set is 3.3701
