In [0]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.sql.functions import col
from pyspark.sql.functions import year
from pyspark.sql.functions import to_date
from pyspark.sql.functions import month
from pyspark.sql.functions import hour
from pyspark.sql.functions import lit
from pyspark.sql.functions import date_format


In [0]:
#
#
# Loading data and performing transformations
#
#

In [0]:
# Load data from parquet (dftaxis from part 1)
df_taxis_ml = spark.read.parquet('/mnt/output/dftaxis')

In [0]:
# Add week_day field

df_taxis_ml = df_taxis_ml.withColumn("week_day", date_format(col("lpep_dropoff_datetime"), "EEEE"))


In [0]:
# Add trip_month field
df_taxis_ml = df_taxis_ml.withColumn("trip_month", month(df_taxis_ml.lpep_dropoff_datetime))


In [0]:
# Add trip_hour field

df_taxis_ml = df_taxis_ml.withColumn("trip_hour", hour(df_taxis_ml.lpep_dropoff_datetime))


In [0]:
#
#
# Explore and prepare Data
#
#

In [0]:
# Count rows
df_taxis_ml.count()

Out[108]: 158698393

In [0]:
# Explore preview of the data
df_taxis_ml.display()

VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,payment_type,congestion_surcharge,Duration,Speed,taxi_colour,week_day,trip_month,trip_hour
1,2019-05-01T00:44:32.000+0000,2019-05-01T00:48:54.000+0000,N,1.0,98,265,1.0,3.8,12.0,0.5,0.5,0.0,0.0,0.3,13.3,2.0,0.0,0.0727777777777777,52.21374045801527,Yellow,Wednesday,5,0
2,2019-05-01T00:36:52.000+0000,2019-05-01T00:40:12.000+0000,N,1.0,74,168,1.0,2.8,9.5,0.5,0.5,0.0,0.0,0.3,10.8,2.0,0.0,0.0555555555555555,50.4,Yellow,Wednesday,5,0
1,2019-05-01T01:03:38.000+0000,2019-05-01T01:25:22.000+0000,N,2.0,132,148,1.0,18.2,52.0,2.5,0.5,9.7,0.0,0.3,65.0,1.0,2.5,0.3622222222222222,50.24539877300613,Yellow,Wednesday,5,1
1,2019-05-01T01:35:19.000+0000,2019-05-01T01:57:26.000+0000,N,1.0,132,97,1.0,18.7,50.0,0.5,0.5,0.0,0.0,0.3,51.3,2.0,0.0,0.3686111111111111,50.7309721175584,Yellow,Wednesday,5,1
2,2019-05-01T02:15:40.000+0000,2019-05-01T02:39:00.000+0000,N,2.0,132,120,2.0,20.54,52.0,0.0,0.5,11.78,6.12,0.3,70.7,1.0,0.0,0.3888888888888889,52.817142857142855,Yellow,Wednesday,5,2
2,2019-05-01T02:12:38.000+0000,2019-05-01T02:40:04.000+0000,N,1.0,132,248,1.0,23.29,61.0,0.5,0.5,0.0,6.12,0.3,68.42,1.0,0.0,0.4572222222222222,50.93803159173755,Yellow,Wednesday,5,2
2,2019-05-01T03:41:11.000+0000,2019-05-01T03:57:59.000+0000,N,2.0,145,132,1.0,15.36,52.0,0.0,0.5,14.73,6.12,0.3,73.65,1.0,0.0,0.28,54.85714285714285,Yellow,Wednesday,5,3
2,2019-05-01T04:55:06.000+0000,2019-05-01T05:27:37.000+0000,N,1.0,132,220,1.0,27.36,71.0,0.5,0.5,0.0,5.76,0.3,78.06,2.0,0.0,0.5419444444444445,50.48487954894925,Yellow,Wednesday,5,5
2,2019-05-01T04:09:59.000+0000,2019-05-01T04:23:32.000+0000,N,1.0,33,138,1.0,11.55,31.5,0.5,0.5,0.0,0.0,0.3,32.8,2.0,0.0,0.2258333333333333,51.1439114391144,Yellow,Wednesday,5,4
1,2019-05-01T04:11:36.000+0000,2019-05-01T04:24:48.000+0000,N,1.0,132,138,1.0,11.5,31.5,0.5,0.5,6.55,0.0,0.3,39.35,1.0,0.0,0.22,52.27272727272727,Yellow,Wednesday,5,4


In [0]:
# check data type
type(df_taxis_ml)

Out[6]: pyspark.sql.dataframe.DataFrame

In [0]:
# Explore Schema
df_taxis_ml.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Duration: double (nullable = true)
 |-- Speed: double (nullable = true)
 |-- taxi_colour: string (nullable = true)
 |-- week_day: string (nullable = true)
 |-- trip_month: int

In [0]:
# Explore columns
df_taxis_ml.columns

Out[46]: ['VendorID',
 'lpep_pickup_datetime',
 'lpep_dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge',
 'Duration',
 'Speed',
 'taxi_colour',
 'week_day',
 'trip_month',
 'trip_hour']

In [0]:
# Create model dataframe - Records up until the end of march 2022
model_df = df_taxis_ml.where(df_taxis_ml.lpep_dropoff_datetime < '2022-04-01T00:00:00.000+0000')

In [0]:
# Create list for numerical columns that will be used

num_col = ['VendorID', 'RatecodeID', 'PULocationID', 'DOLocationID','passenger_count','trip_distance', 'Duration', 'Speed', 'trip_month',
 'trip_hour']


In [0]:
# Find correlations between variables and total_amount (target)
import six
for i in num_col:
    if not( isinstance(df_taxis_ml.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to total_amount for ", i, df_taxis_ml.stat.corr('total_amount',i))

Correlation to total_amount for  VendorID 0.003646834074845108
Correlation to total_amount for  RatecodeID 0.008012624388208607
Correlation to total_amount for  PULocationID -0.007003119451570852
Correlation to total_amount for  DOLocationID -0.007786706832883725
Correlation to total_amount for  passenger_count -0.0024246995477172407
Correlation to total_amount for  trip_distance 0.08773930552533706
Correlation to total_amount for  Duration 0.07896489026648383
Correlation to total_amount for  Speed 0.051940679015136035
Correlation to total_amount for  trip_month 0.0034782202389642167
Correlation to total_amount for  trip_hour 0.0011974648065269909


In [0]:
# Create list all columns that will be part of the model

cols_list = ['VendorID', 'RatecodeID', 'PULocationID', 'DOLocationID','passenger_count','trip_distance', 'Duration', 'Speed', 'trip_month',
 'trip_hour', 'taxi_colour','week_day', 'total_amount']

In [0]:
# Create empty list stages.

stages = []


In [0]:
# Drop null values from selected columns
model_df = model_df.na.drop(subset=['VendorID', 'RatecodeID', 'PULocationID', 'DOLocationID','passenger_count','trip_distance', 'Duration', 'Speed', 'trip_month', 'trip_hour', 'taxi_colour','week_day', 'total_amount'])


In [0]:
# Create categorical columns list 

cat_cols = ['taxi_colour','week_day'] 

In [0]:
# Iterate through cat_cols - instantiate StringIndexer and OneHotEncoder for each column - add them to stages
for cat_col in cat_cols:
    col_indexer = StringIndexer(inputCol=cat_col, outputCol=f"{cat_col}_ind")
    col_encoder = OneHotEncoder(inputCols=[f"{cat_col}_ind"], outputCols=[f"{cat_col}_ohe"])
    stages += [col_indexer, col_encoder]

In [0]:
# Create new list cat_cols_ohe adding suffix _ohe to each element of cat_cols
cat_cols_ohe = [f"{cat_col}_ohe" for cat_col in cat_cols]

In [0]:
#Instantiate a VectorAssembler 
vector_assembler = VectorAssembler(inputCols=cat_cols_ohe + num_col, outputCol="features")

In [0]:
# Add vector assembler to stages
stages += [vector_assembler]


In [0]:
# Instantiate a pipeline with stages
pipeline = Pipeline(stages=stages)


In [0]:
# Fit the pipeline with the model df
pipeline_model = pipeline.fit(model_df)


In [0]:
#Apply pipeline to the model df
model_df = pipeline_model.transform(model_df)

In [0]:
# See model df with features and label
model_df = model_df.select(['features'] + cols_list)
model_df.show()

+--------------------+--------+----------+------------+------------+---------------+-------------+-------------------+------------------+----------+---------+-----------+--------+------------+
|            features|VendorID|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|           Duration|             Speed|trip_month|trip_hour|taxi_colour|week_day|total_amount|
+--------------------+--------+----------+------------+------------+---------------+-------------+-------------------+------------------+----------+---------+-----------+--------+------------+
|[1.0,0.0,1.0,0.0,...|       1|       1.0|          48|         234|            1.0|          2.1|0.13972222222222222|15.029821073558649|         2|        1|     Yellow|  Friday|        12.3|
|[1.0,0.0,1.0,0.0,...|       1|       1.0|         230|          93|            1.0|          9.8| 0.5136111111111111|19.080584099513253|         2|        1|     Yellow|  Friday|        33.3|
|[1.0,0.0,1.0,0.0,...|       1|    

In [0]:
#
#
# Train and Test (Linear Regression)
#
#

In [0]:
# 
lr_model = model_df

In [0]:
# Split train and test (75-25) plant seed.
train_lr, test_lr = lr_model.randomSplit([0.75, 0.25], seed=8)


In [0]:
# Write delta table for the train set
train_lr.write.format("delta").save('/dbfs/traintest')


In [0]:
# Read train set from delta table
train_lr = spark.read.format("delta").load('/dbfs/traintest')

In [0]:
# Write delta table for the test set

test_lr.write.format("delta").save('/dbfs/testtest')


In [0]:
# Read test set from delta table
test_lr = spark.read.format("delta").load('/dbfs/testtest')

In [0]:
# Assign label and features to fit the model
Regressor = LinearRegression(featuresCol = 'features', labelCol = 'total_amount')


In [0]:
# Fit the lr model to the train data
lr_model_fit = Regressor.fit(train_lr)

In [0]:
# Obtain summary from the trained model
lr_summary = lr_model_fit.summary


In [0]:
# Extract RMSE (Train)
lr_train_rmse = lr_summary.rootMeanSquaredError


In [0]:
print(lr_train_rmse)


163.594006516476


In [0]:
# Make predictions on the train set using the trained lr model

lr_train_pred = lr_model_fit.transform(train_lr)


In [0]:
# See training set with predictions

lr_train_pred.show()


+--------------------+--------+----------+------------+------------+---------------+-------------+--------------------+------------------+----------+---------+-----------+--------+------------+------------------+
|            features|VendorID|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|            Duration|             Speed|trip_month|trip_hour|taxi_colour|week_day|total_amount|        prediction|
+--------------------+--------+----------+------------+------------+---------------+-------------+--------------------+------------------+----------+---------+-----------+--------+------------+------------------+
|(17,[0,1,7,8,9,10...|       1|       1.0|           4|         107|            0.0|          1.0| 0.08527777777777777|11.726384364820849|         3|        0|     Yellow|Thursday|        10.8|  9.92933277448092|
|(17,[0,1,7,8,9,10...|       1|       1.0|           7|           7|            0.0|          0.8|0.057777777777777775|13.846153846153847|         3

In [0]:
# Make predictions on test set 
lr_test_pred = lr_model_fit.transform(test_lr)


In [0]:
# See test set with predictions
lr_test_pred.show()

+--------------------+--------+----------+------------+------------+---------------+-------------+-------------------+------------------+----------+---------+-----------+--------+------------+------------------+
|            features|VendorID|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|           Duration|             Speed|trip_month|trip_hour|taxi_colour|week_day|total_amount|        prediction|
+--------------------+--------+----------+------------+------------+---------------+-------------+-------------------+------------------+----------+---------+-----------+--------+------------+------------------+
|(17,[0,1,7,8,9,10...|       1|       1.0|          25|         181|            0.0|          1.2|0.09444444444444444|12.705882352941176|         3|        0|     Yellow|Thursday|        10.1|10.739331063985684|
|(17,[0,1,7,8,9,10...|       1|       1.0|          45|         211|            0.0|          1.0|0.08333333333333333|              12.0|         3|    

In [0]:
# Evaluate Model on test
lr_test_evaluate = lr_model_fit.evaluate(test_lr)


In [0]:
# Extract RMSE (Test)
lr_test_rmse = lr_test_evaluate.rootMeanSquaredError


In [0]:
print(lr_test_rmse)

116.26498734210543


In [0]:
#
#
# Train and Test (Decision Tree)
#
#

In [0]:
tree_model = model_df

In [0]:
# Assign label and features to fit the model

dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'total_amount')

In [0]:
# Fit Decision tree model to the train data.

dt_model = dt.fit(train_lr)

In [0]:
# Predict ont he train  data
dt_train_predictions = dt_model.transform(train_lr)

In [0]:
# See predictions on train
dt_train_predictions.show()

+--------------------+--------+----------+------------+------------+---------------+-------------+--------------------+------------------+----------+---------+-----------+--------+------------+------------------+
|            features|VendorID|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|            Duration|             Speed|trip_month|trip_hour|taxi_colour|week_day|total_amount|        prediction|
+--------------------+--------+----------+------------+------------+---------------+-------------+--------------------+------------------+----------+---------+-----------+--------+------------+------------------+
|(17,[0,1,7,8,9,10...|       1|       1.0|           4|         107|            0.0|          1.0| 0.08527777777777777|11.726384364820849|         3|        0|     Yellow|Thursday|        10.8|11.053978025750846|
|(17,[0,1,7,8,9,10...|       1|       1.0|           7|           7|            0.0|          0.8|0.057777777777777775|13.846153846153847|         3

In [0]:
dt_train_predictions.count()

Out[40]: 112994423

In [0]:
# Evaluate model on train
dt_evaluate_train = RegressionEvaluator(labelCol="total_amount", predictionCol="prediction", metricName="rmse")

In [0]:
# Estract RMSE for train
rmse_train = dt_evaluate_train.evaluate(dt_train_predictions)

In [0]:
print("Root Mean Squared Error (RMSE) on train data = %g" % rmse_train)

Root Mean Squared Error (RMSE) on train data = 163.589


In [0]:
# Predict on test data
dt_test_predictions = dt_model.transform(test_lr)

In [0]:
# See a preview of the prediction on test.
dt_test_predictions.show()

+--------------------+--------+----------+------------+------------+---------------+-------------+-------------------+------------------+----------+---------+-----------+--------+------------+------------------+
|            features|VendorID|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|           Duration|             Speed|trip_month|trip_hour|taxi_colour|week_day|total_amount|        prediction|
+--------------------+--------+----------+------------+------------+---------------+-------------+-------------------+------------------+----------+---------+-----------+--------+------------+------------------+
|(17,[0,1,7,8,9,10...|       1|       1.0|          25|         181|            0.0|          1.2|0.09444444444444444|12.705882352941176|         3|        0|     Yellow|Thursday|        10.1|11.053978025750846|
|(17,[0,1,7,8,9,10...|       1|       1.0|          45|         211|            0.0|          1.0|0.08333333333333333|              12.0|         3|    

In [0]:
# Evaluate model on test.
dt_evaluate_test = RegressionEvaluator(labelCol="total_amount", predictionCol="prediction", metricName="rmse")

In [0]:
# Extract RMSE
rmse_test = dt_evaluate_test.evaluate(dt_test_predictions)

In [0]:
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse_test)

Root Mean Squared Error (RMSE) on test data = 116.259
