In [None]:
#creating variables for account name, access key and container's name
storage_account_name = "shiv007"
storage_account_access_key = "Your_Storage_Access_Key"
container_name = "at2"

In [None]:
#Mounting all the data from the Azure blob storage into DBFS
dbutils.fs.mount(
  source = f'wasbs://{container_name}@{storage_account_name}.blob.core.windows.net',
  mount_point = f'/mnt/{container_name}/',
  extra_configs = {'fs.azure.account.key.' + storage_account_name + '.blob.core.windows.net': storage_account_access_key}
)

##PART 1

In [None]:
#importing the required libraries
import pandas as pd
import numpy as np
import glob 
import os
import pyarrow.parquet as pq
import datetime
from pyspark.sql.functions import *
import pyspark.sql.functions as F


In [None]:
#To check the contents mounted into the yellow taxi directory where airport fees is of type double
dbutils.fs.ls("/mnt/at2/yellow/double")

In [None]:
#reading the yellow taxi into two seperate variables one for double and the other for integer type airport fees
df1 = spark.read.parquet('dbfs:/mnt/at2/yellow/double/*.parquet')
df2 = spark.read.parquet('dbfs:/mnt/at2/yellow/integer')
df2.count()

In [None]:
#changing the data type of the airport fee column to double for all yellow taxi datasets 
df2 = df2.withColumn("airport_fee",df2.airport_fee.cast('double'))
df2.count()

In [None]:
#merging all the yellow taxi datasets into one
df3 = df1.unionByName(df2)

In [None]:
#counting the total number of rows for all the yellow taxi data
df3.count()

In [None]:
#Two temporary columns created
df3 = df3.withColumn("suma", col("fare_amount")+col("extra")+col("mta_tax")+col("tip_amount")+col("tolls_amount")+col("improvement_surcharge"))
df3 = df3.withColumn("sumb", col("total_amount")+col("airport_fee")+col("congestion_surcharge"))

#Adding  airport_fee and congestion_surcharge to total_amount where they weren't added
df3 = df3.withColumn('final_sum', F.when(F.col('total_amount') == F.col('suma'),col("sumb")).otherwise(df3.total_amount))
df3 = df3.drop('total_amount')
df3 = df3.withColumnRenamed("final_sum","total_amount")
df3 = df3.drop('final_sum', 'suma', 'sumb')

In [None]:
#changing column names, dropping the airport_fee column and adding a colour column
df3 = df3.withColumnRenamed("tpep_pickup_datetime","pickup_datetime")
df3 = df3.withColumnRenamed("tpep_dropoff_datetime","dropoff_datetime")
df3 = df3.drop("airport_fee")
df3.printSchema()
df3 = df3.withColumn("Colour", lit("yellow"))
df3.count()

In [None]:
# reading the green taxi data, removing extra columns, renaming some columns and chaning the datatype of the "payment_type" column to "long"
dfg = spark.read.parquet("dbfs:/mnt/at2/green")
dfg = dfg.withColumnRenamed("lpep_pickup_datetime","pickup_datetime")
dfg = dfg.withColumnRenamed("lpep_dropoff_datetime","dropoff_datetime")
dfg = dfg.withColumn("payment_type",dfg.payment_type.cast('long'))
dfg = dfg.drop("ehail_fee","trip_type")
dfg = dfg.withColumn("Colour", lit("green"))


In [None]:
# counting the number of rows for all the green taxi data
dfg.count()

In [None]:
#merging the yellow and the green taxi datasets into one 
df3 = df3.unionByName(dfg)

df3.printSchema()

In [None]:
df = spark.read.parquet("dbfs:/mnt/at2/yellow/double/yellow_tripdata_2022-04.parquet")
df.write.csv("dbfs:/mnt/at2/yellow/double/csv_file")

In [None]:
#select all the data where passenger_count is not greater than 5
df3 = df3.filter(df3.passenger_count<6)

In [None]:
#Removing trips that are finishing before the starting time
df3 = df3.filter(df3.dropoff_datetime > df3.pickup_datetime)

#filtering all the data between January 2019 to April 2022
df3 = df3.filter(df3.pickup_datetime < "2022-05-01 00:00:00")
df3 = df3.filter(df3.pickup_datetime > "2018-12-31 23:59:59")

#Removing all the trips where dropoff_datetime doesn't lie in between January 2019 and April 2022
df3 = df3.filter(df3.dropoff_datetime < "2022-05-01 10:00:00")
df3 = df3.filter(df3.dropoff_datetime > "2018-12-31 23:59:59")
df3.count()
df3.printSchema()

In [None]:
df3 = df3.withColumn('DiffInSeconds',col("dropoff_datetime").cast("long") - col('pickup_datetime').cast("long"))
df3 = df3.withColumn('DiffInHours',round(col('DiffInSeconds')/3600, 2))
df3 = df3.withColumn('Speed', round(col("trip_distance")/col("DiffInHours"),2))
df3.show()
df3.count()
df3.printSchema()

In [None]:
#removing all the trips with negative speed
df3 = df3.filter(df3.Speed > 0)
df3.printSchema()

In [None]:
#removing trips with very high average speed
df3 = df3.filter(df3.Speed < 55)
df3.printSchema()

In [None]:
#Removing all the trips below the duration of 2 minutes and above the duration of 10 hours
df3 = df3.filter(df3.DiffInSeconds > 120 )
df3 = df3.filter(df3.DiffInSeconds < 36000)
df3.printSchema()

In [None]:
#Removing trips shorter than 0.06 miles and longer than 50 miles
df3 = df3.filter(df3.trip_distance > 0.06 )
df3 = df3.filter(df3.trip_distance < 50)
df3.printSchema()

In [None]:
#removing negative tip_amount values from the data
df3 = df3.filter(df3.tip_amount >= 0)
df3.printSchema()

In [None]:
#writing the final data into DBFS
df3.repartition(1).write.mode("overwrite").format('parquet').save('dbfs:/mnt/at2/final_data')

##PART 2

In [None]:
#reading the parquet file of the data
d = spark.read.parquet('dbfs:/mnt/at2/final_data/*.parquet')

In [None]:
#converting the parquet file into a table called table_1 into DBFS
d.write.saveAsTable("Table_1")

In [None]:
%sql
/*calculating the total number of trips, day of week having the most trips, hour of the day having the most trips, average passengers, average amount paid per trip and average amount paid per person for a each specific year and month*/
SELECT year, month, No_of_Trips, Week_day, Hour_of_the_Day, avg_passenger_count, avg_amt_per_trip, avg_amt_per_person from
(
select
year, month, No_of_Trips, Week_day, Hour_of_the_Day,  avg_passenger_count, avg_amt_per_trip, avg_amt_per_person ,rank() OVER (PARTITION BY Year, Month ORDER BY No_of_Trips DESC) AS RK from 
  (
  select count(*) AS No_of_Trips, dayofweek(pickup_datetime) as Week_day, hour(pickup_datetime) as Hour_of_the_Day, YEAR(pickup_datetime) AS Year, MONTH(pickup_datetime) AS Month, round(avg(passenger_count),2) as avg_passenger_count, round(avg(total_amount),2) as avg_amt_per_trip, round(avg(total_amount)/avg(passenger_count),2) as avg_amt_per_person  from table_1
  GROUP BY YEAR(pickup_datetime), MONTH(pickup_datetime), hour(pickup_datetime), dayofweek(pickup_datetime)
  Order by YEAR(pickup_datetime), MONTH(pickup_datetime)
  )) where rk = 1;

year,month,No_of_Trips,Week_day,Hour_of_the_Day,avg_passenger_count,avg_amt_per_trip,avg_amt_per_person
2019,1,96067,5,18,1.43,15.85,11.1
2019,2,77568,6,18,1.45,18.08,12.5
2019,3,93832,6,18,1.46,18.82,12.93
2019,4,93026,3,18,1.44,18.65,12.99
2019,5,90210,5,18,1.44,19.7,13.68
2019,6,67419,7,17,1.53,18.58,12.14
2019,7,75869,3,18,1.44,18.79,13.09
2019,8,72567,5,18,1.44,19.56,13.58
2019,9,68122,5,19,1.42,19.43,13.65
2019,10,85036,4,18,1.42,19.52,13.78


In [None]:
%sql
/*calculating the average, median, minimum and maximum trip duration, distance and speed*/
select colour, round(avg(DiffInSeconds/60),2) as Average_TripDuration, 
       round(min(DiffInSeconds/60),2) as Minimum_TripDuration,
       round(max(DiffInSeconds/60),2) as Maximum_TripDuration,
       round(PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY DiffInSeconds/60),2) as Median_TripDuration,
       round(avg(trip_distance*1.609),2) as Average_TripDistance,
       round(min(trip_distance*1.609),2) as Minimum_TripDistance,
       round(max(trip_distance*1.609),2) as Maximum_TripDistance,
       round(PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY trip_distance*1.609),2) as Median_TripDistance,
       round(avg(Speed*1.609),2) as Average_Speed,
       round(min(Speed*1.609),2) as Minimum_Speed,
       round(max(Speed*1.609),2) as Maximum_Speed,
       round(PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY Speed*1.609),2) as Median_Speed
from table_1
group by colour;

colour,Average_TripDuration,Minimum_TripDuration,Maximum_TripDuration,Median_TripDuration,Average_TripDistance,Minimum_TripDistance,Maximum_TripDistance,Median_TripDistance,Average_Speed,Minimum_Speed,Maximum_Speed,Median_Speed
yellow,14.29,2.02,599.97,11.17,4.89,0.11,80.43,2.74,18.71,0.02,88.48,16.09
green,15.16,2.02,599.98,11.3,5.13,0.11,80.29,3.11,19.4,0.02,88.46,17.35


In [None]:
%sql
/*calculating the percentage of trips where drivers received tips*/
Select round((
select count(tip_amount) 
from table_1
where tip_amount > 0)
/count(tip_amount)*100, 2) AS no_of_t
from table_1;

no_of_t
70.13


In [None]:
%sql
/*calculating the percentage of trips where the tips received by the drivers was greater than $10 */
Select round((
select count(tip_amount) 
from table_1
where tip_amount > 9)
/count(tip_amount)*100, 2) AS no_of_t
from table_1;

no_of_t
2.94


In [None]:
%sql
/*calculating average speed and average distance covered per kilometer for different bins of duration*/
With table2 as
(
SELECT
     CASE 
        WHEN DiffInSeconds/60 < 5 THEN 'Under_5_mins'
        WHEN DiffInSeconds/60 >= 5 AND DiffInSeconds/60 <= 10 THEN '5_To_10mins'
        WHEN DiffInSeconds/60 > 10 AND DiffInSeconds/60 <= 20 THEN '10_To_20mins'
        WHEN DiffInSeconds/60 > 20 AND DiffInSeconds/60 <= 30 THEN '20_To_30mins'
        WHEN DiffInSeconds/60 > 30 AND DiffInSeconds/60 <= 60 THEN '30_To_60mins'
        WHEN DiffInSeconds/60 > 60 THEN 'At_least_60mins'
    End as BinsOfDurations,
    Speed, trip_distance, total_amount
FROM
    table_1
)
SELECT
    BinsOfDurations,
    count(*) AS No_Of_Trips, round((avg(Speed)*1.609), 2) as Avg_Speed_KMPH, ROUND(sum((trip_distance)*1.609)/sum(total_amount), 2) AS Avg_DistanceKm_perDollar
FROM
    table2
GROUP BY
    BinsOfDurations;

BinsOfDurations,No_Of_Trips,Avg_Speed_KMPH,Avg_DistanceKm_perDollar
At_least_60mins,1263951,23.07,0.4
Under_5_mins,19674997,18.95,0.13
30_To_60mins,10765267,27.32,0.37
20_To_30mins,18456627,21.57,0.3
5_To_10mins,45955904,16.84,0.18
10_To_20mins,53388409,17.51,0.24


##PART 3

In [None]:
#reading the data into a variable dna to ready it up for machine learning 
dna = df3
dna = dna.drop("DiffInHours","Speed","Colour","store_and_fwd_flag","VendorID","tore_and_fwd_flag", "Fare_amount", "PULocationID", "DOLocationID", "Trip_type")
dna.printSchema()


In [None]:
#creating three new columns as month, year and day
dna = dna.withColumn('month',month(dna.pickup_datetime))
dna = dna.withColumn('year',year(dna.pickup_datetime))
dna = dna.withColumn('day',dayofmonth(dna.pickup_datetime))
dna.printSchema()

In [None]:
#casting the data type of column payment_type to "long and removing all the unpredictable value columns"
dna = dna.withColumn("payment_type", dna.payment_type.cast('long'))
ls= ["1","2","5"]
dna = dna.filter(dna.payment_type.isin(ls))

In [None]:
#Removing the column "dropoff_datetime"
dna = dna.drop("dropoff_datetime" )
dna.printSchema()

In [None]:
#Removing the column payment_type
dna = dna.drop("payment_type")

In [None]:
#replacing '-' with '_' in the entire dataset
for col in dna.columns:
    dna = dna.withColumnRenamed(col, col.strip().replace('-', '_'))

In [None]:
dna1 = dna.filter(dna.pickup_datetime < "2022-04-01 00:00:00")
dna1 = dna1.drop("pickup_datetime")

In [None]:
colz = ['DiffInSeconds','passenger_count','RatecodeID','month','year', 'day', 'mta_tax','trip_distance','extra','tolls_amount', 'improvement_surcharge', 'congestion_surcharge','tip_amount']

In [None]:
num_cols = ['DiffInSeconds','trip_distance','extra','tolls_amount', 'improvement_surcharge', 'congestion_surcharge','tip_amount']

In [None]:
total_amount = ['total_amount']

In [None]:
cat_cols = ['passenger_count','RatecodeID','month','year', 'day', 'mta_tax']

In [None]:
dna1.printSchema()

##GRADIENT-BOOSTED TREE REGRESSOR

In [None]:
#importing the required libraries 
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.feature import StringIndexer, VectorAssembler

In [None]:
stages = []

In [None]:
#applying string indexer over the categorical value columns
for cat_col in cat_cols:

    col_indexer = StringIndexer(inputCol=cat_col, outputCol=f"{cat_col}_ind")

    stages += [col_indexer]


cat_cols_ind = [f"{cat_col}_ind" for cat_col in cat_cols]
cat_cols_ind = [x.replace('_ind', '') for x in cat_cols_ind]

In [None]:
#Applying GB Tree Regressor
regressor = GBTRegressor(featuresCol = 'features', labelCol = "total_amount")

In [None]:
#Using Vector assembler and creating a pipeline
assembler = VectorAssembler(inputCols=cat_cols_ind + num_cols, outputCol="features")
pipeline  = Pipeline(stages = [assembler,regressor])

In [None]:
#fitting the data through pipeline in the model
pipeline_model = pipeline.fit(dna1)

In [None]:
#saving the pipeline
pipeline_model.write().overwrite().save("pipeline_save")
pipeline_Model = PipelineModel.load('dbfs:/pipeline_save')

In [None]:
#testing over different number of iterations
paramGrid = ParamGridBuilder().addGrid(regressor.maxIter,[5, 10, 20]).build()

In [None]:
#splitting the data into train and test in tehe ratio of 80% isto 20%
train_data , test_data = dna1.randomSplit([0.8,0.2], seed = 123)

In [None]:
#applying cross validation over 3 folds
from pyspark.ml.tuning import CrossValidator
crossval = CrossValidator(estimator = pipelineModel,
                         estimatorParamMaps = paramGrid,
                         evaluator = RegressionEvaluator(labelCol = "total_amount"),
                         numFolds = 3)

crossval

In [None]:
#fitting the cross-validated model
cvModel1 = crossval.fit(train_data)

In [None]:
#fitting the best model
bestModel = cvModel1.bestModel
print(bestModel.stages)

In [None]:
#drawing predictions over test data
pred = cvModel1.transform(test_data)

In [None]:
#calculating RMSE
eval = RegressionEvaluator(labelCol = 'total_amount')
rmse = eval.evaluate(pred, {eval.metricName:'rmse'})

In [None]:
#printing RMSE
print("RMSE: %.3f" %rmse)

##RANDOM FOREST CLASSIFIER

In [None]:
from pyspark.ml.regression import RandomForestRegressor

In [None]:
stages = []

In [None]:
for cat_col in cat_cols:

    col_indexer = StringIndexer(inputCol=cat_col, outputCol=f"{cat_col}_ind")

    stages += [col_indexer]


cat_cols_ind = [f"{cat_col}_ind" for cat_col in cat_cols]
cat_cols_ind = [x.replace('_ind', '') for x in cat_cols_ind]

In [None]:
regressor = RandomForestRegressor(featuresCol = "features",labelCol = "total_amount")

In [None]:
assembler = VectorAssembler(inputCols=cat_cols_ind + num_cols, outputCol="features")
pipeline  = Pipeline(stages = [assembler,regressor])

In [None]:
pipeline_modelrf = pipeline.fit(dna1)

In [None]:
pipeline_modelrf.write().overwrite().save("pipeline_save_rf")
pipeline_Modelrf = Pipeline.load('dbfs:/pipeline_save_rf')

In [None]:
paramGrid = ParamGridBuilder().addGrid(regressor.numTrees,[100,500]) .build()

In [None]:
train_data , test_data = dna1.randomSplit([0.8,0.2], seed = 123)

In [None]:
from pyspark.ml.tuning import CrossValidator
crossval = CrossValidator(estimator = pipelineModel,
                         estimatorParamMaps = paramGrid,
                         evaluator = RegressionEvaluator(labelCol = "total_amount"),
                         numFolds = 3)

crossval

In [None]:
cvModel = crossval.fit(train_data)

In [None]:
bestModel = cvModel.bestModel
print(bestModel.stages)

In [None]:
pred = cvModel.transform(test_data)

In [None]:
eval = RegressionEvaluator(labelCol = 'total_amount')
rmse = eval.evaluate(pred, {eval.metricName:'rmse'})

In [None]:
print("RMSE: %.3f" %rmse)

##Predicting April 2022

In [None]:
#selecting data for only April 2022
dna2 = dna.filter(dna.pickup_datetime > "2022-03-31 23:59:59")
dna2 = dna2.drop("pickup_datetime")

In [None]:
#applying predction using GB Tree model
pred = cvModel1.transform(dna2)

In [None]:
#Calculating RMSE
eval = RegressionEvaluator(labelCol = 'total_amount')
rmse = eval.evaluate(pred, {eval.metricName:'rmse'})

In [None]:
#Printing RMSE
print("RMSE: %.3f" %rmse)

In [None]:
import seaborn as sns

In [None]:
#Plot actual vs prediction april 2022
dnos = pred.select('total_amount', 'prediction')
pad = dnos.toPandas()
sns.regplot(x="total_amount", y="prediction", data=pad);