# NY CAB DATA
## Price and Trip duration Prediction

### IMPORTANT: Data is stored into Postgres Image

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
import psycopg2 as pg
import numpy as np 
import warnings
import os
warnings.filterwarnings('ignore')
pd.options.plotting.backend = "plotly"

In [2]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

### Setting Up Spark 

In [3]:
jar_path = os.path.join(os.path.dirname(os.path.abspath('')),'docker_sql','postgresql-42.5.0.jar')
jar_path, os.path.isfile(jar_path)

('c:\\Users\\Olist\\OneDrive\\Ambiente de Trabalho\\Projects\\ny_cab_app\\docker_sql\\postgresql-42.5.0.jar',
 True)

In [4]:
postgres_url = f"jdbc:postgresql://localhost:5432/ny_taxi"

In [5]:
spark = SparkSession.builder.appName("ML_model").master("local").config("spark.jars", jar_path).config("spark.driver.memory", "15g").getOrCreate()

### Loading Data

In [6]:
df = spark.read.format("jdbc").options(
                url=postgres_url,
                driver="org.postgresql.Driver",
                dbtable='ny_taxi',
                user='root',
                password='root'
                ).load()
df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2022-06-01 00:25:41|  2022-06-01 00:48:22|            1.0|         11.0|       1.0|                 N|          70|          48|           1|       32.0|  3.0|    0.5|       2.

In [7]:
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



## Data Cleaning And Validation

In [8]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       0|                   0|                    0|         132448|            0|    132448|            132448|           0|           0|           0|          0|    0|      0|         

#### Feature Creation

In [9]:
df = df.withColumn('pickup_day_shift', round(hour(df.tpep_pickup_datetime)/5,0))

In [10]:
df = df.withColumn('trip_duration', (col("tpep_dropoff_datetime").cast("long") - col('tpep_pickup_datetime').cast("long"))/3600)

In [11]:
df = df.withColumn('pickup_hour', hour(col('tpep_pickup_datetime')))
df = df.withColumn('pickup_day', dayofmonth(col('tpep_pickup_datetime')))

In [12]:
df.describe().toPandas()

Unnamed: 0,summary,VendorID,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,...,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,pickup_day_shift,trip_duration,pickup_hour,pickup_day
0,count,3558124.0,3425676.0,3558124.0,3425676.0,3425676,3558124.0,3558124.0,3558124.0,3558124.0,...,3558124.0,3558124.0,3558124.0,3558124.0,3425676.0,3425676.0,3558124.0,3558124.0,3558124.0,3558124.0
1,mean,1.71421625553241,1.3990418241538312,5.968216000903217,1.4181633055782277,,164.60850830381403,162.41222846646153,1.1818587547820143,15.249263010518249,...,2.7958426406719723,0.562215619806497,0.2960561239765024,22.118415234565592,2.2823746320434277,0.0957336595755115,2.8289208020855936,7.021806643751993,14.169015750996872,15.277089837228832
2,stddev,0.4877038929139574,0.960712571772369,594.1291221670459,5.702573931569726,,65.54112126953186,70.16921804762987,0.5104356496092229,212.1835543481579,...,3.581684820270223,2.121172505401807,0.0481092526932312,212.46228920765853,0.7490436767387179,0.3357336222458381,1.212871821309035,1075.2233002350804,5.827150065751408,8.614052677355653
3,min,1.0,0.0,0.0,1.0,N,1.0,1.0,0.0,-907.0,...,-80.08,-63.2,-0.3,-911.55,-2.5,-1.25,0.0,-11.306666666666668,0.0,1.0
4,max,6.0,9.0,307007.11,99.0,Y,265.0,265.0,4.0,395844.94,...,1400.16,800.09,0.3,395848.24,2.75,1.25,5.0,172034.25305555554,23.0,31.0


In [13]:
df.approxQuantile("trip_duration", [0.8],0.05)

[0.3877777777777778]

#### Drop Non Numeric columns

In [14]:
df = df.drop(*('tpep_pickup_datetime','tpep_dropoff_datetime','store_and_fwd_flag'))

#### Filtering All Negative Values

In [15]:
df.select([(when(col(c)>=0, col(c))).alias(c) for c in df.columns]).describe().toPandas()

Unnamed: 0,summary,VendorID,passenger_count,trip_distance,RatecodeID,PULocationID,DOLocationID,payment_type,fare_amount,extra,...,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,pickup_day_shift,trip_duration,pickup_hour,pickup_day
0,count,3558124.0,3425676.0,3558124.0,3425676.0,3558124.0,3558124.0,3558124.0,3535708.0,3547064.0,...,3557709.0,3557040.0,3535453.0,3535411.0,3408036.0,3423248.0,3558124.0,3556903.0,3558124.0,3558124.0
1,mean,1.71421625553241,1.3990418241538312,5.968216000903217,1.4181633055782277,164.60850830381403,162.41222846646153,1.1818587547820143,15.437022924415029,1.026672763727971,...,2.796418324263827,0.5647434468008284,0.2998783183000379,22.37457250883357,2.307128211086972,0.0966881452935925,2.8289208020855936,7.0242221347066325,14.169015750996872,15.277089837228832
2,stddev,0.4877038929139574,0.960712571772369,594.1291221670459,5.702573931569726,65.54112126953186,70.16921804762987,0.5104356496092229,212.8345808604909,1.2518560199878157,...,3.5804086537432727,2.115188681057374,0.0060406721239371,213.11168712546333,0.6670658499919979,0.3339335514012864,1.212871821309035,1075.4078257923095,5.827150065751408,8.614052677355653
3,min,1.0,0.0,0.0,1.0,1.0,1.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0
4,max,6.0,9.0,307007.11,99.0,265.0,265.0,4.0,395844.94,8.25,...,1400.16,800.09,0.3,395848.24,2.75,1.25,5.0,172034.25305555554,23.0,31.0


In [16]:
df.select([(when(col(c)>=0, col(c))).otherwise(None).alias(c) for c in df.columns]).dropna().describe().toPandas()

Unnamed: 0,summary,VendorID,passenger_count,trip_distance,RatecodeID,PULocationID,DOLocationID,payment_type,fare_amount,extra,...,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,pickup_day_shift,trip_duration,pickup_hour,pickup_day
0,count,3402996.0,3402996.0,3402996.0,3402996.0,3402996.0,3402996.0,3402996.0,3402996.0,3402996.0,...,3402996.0,3402996.0,3402996.0,3402996.0,3402996.0,3402996.0,3402996.0,3402996.0,3402996.0,3402996.0
1,mean,1.7021098467350535,1.3995026735265044,3.638474059328231,1.419442456000536,164.69242338221966,162.74214662609066,1.2137557610999248,15.197886374245227,1.0670598643078015,...,2.7678593715652853,0.5610226958855934,0.2998786951447074,22.09809153993338,2.310544443778365,0.0972635583468214,2.834960428986693,6.8215638575850175,14.19986652937588,15.265127846168491
2,stddev,0.4573310303421309,0.9618302087930902,116.120020328652,5.721189634212782,65.13671764539471,70.07469085739774,0.4273746420296052,216.9212335596505,1.260960334271295,...,3.581400112337852,2.1051586831417928,0.0060313147761382,217.1834310896354,0.6616210236680115,0.3348421734167634,1.2069815123200245,1059.1713120893264,5.797116147789476,8.61786106460677
3,min,1.0,0.0,0.0,1.0,1.0,1.0,1.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0
4,max,2.0,9.0,184340.8,99.0,265.0,265.0,4.0,395844.94,8.25,...,1400.16,800.09,0.3,395848.24,2.75,1.25,5.0,172034.25305555554,23.0,31.0


In [17]:
df = df.select([(when(col(c)>=0, col(c))).otherwise(None).alias(c) for c in df.columns]).dropna()

##### Filtering Outliers and Bad Data with Quantiles

In [18]:
#feature_quantiles = dict([(c,df.approxQuantile(c, [0.9],0.05)) for c in df.columns])

In [19]:
feature_quantiles = {'VendorID': [2.0],
 'passenger_count': [3.0],
 'trip_distance': [8.09],
 'RatecodeID': [1.0],
 'PULocationID': [239.0],
 'DOLocationID': [239.0],
 'payment_type': [2.0],
 'fare_amount': [37.5],
 'extra': [3.0],
 'mta_tax': [0.5],
 'tip_amount': [6.2],
 'tolls_amount': [6.55],
 'improvement_surcharge': [0.3],
 'total_amount': [40.56],
 'congestion_surcharge': [2.5],
 'airport_fee': [1.25],
 'pickup_day_shift': [4.0],
 'trip_duration': [0.5477777777777778],
 'pickup_hour': [21.0],
 'pickup_day': [29.0]}

In [20]:
df.select([when(col(c)<=feature_quantiles[c][0],col(c)).otherwise(None).alias(c) for c in df.columns]).dropna().describe().toPandas()

Unnamed: 0,summary,VendorID,passenger_count,trip_distance,RatecodeID,PULocationID,DOLocationID,payment_type,fare_amount,extra,...,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,pickup_day_shift,trip_duration,pickup_hour,pickup_day
0,count,1855609.0,1855609.0,1855609.0,1855609.0,1855609.0,1855609.0,1855609.0,1855609.0,1855609.0,...,1855609.0,1855609.0,1855609.0,1855609.0,1855609.0,1855609.0,1855609.0,1855609.0,1855609.0,1855609.0
1,mean,1.7396202540513654,1.215039914119839,1.981356282492616,1.0,159.30137599030832,156.90815090894688,1.2143129290707255,10.09790489806822,0.9006732183342511,...,1.969192841810312,0.0227791037874905,0.2999482649581093,15.58373613213189,2.3953793606303915,0.0141395628066041,2.6382233541656674,0.2023686093232868,13.191789326307427,14.779966038103932
2,stddev,0.4388419278421264,0.5381312154058201,1.3832570107558455,0.0,62.93481937910458,65.33477324215734,0.4103449624968691,4.590257663892595,1.0608261203411542,...,1.459534790659587,0.3837827507805387,0.0039392693363832,5.456998344258471,0.5006058881947676,0.1321912844670054,1.1345832637890243,0.1113090472446366,5.431940142331128,8.349464206498286
3,min,1.0,0.0,0.0,1.0,1.0,1.0,1.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0
4,max,2.0,3.0,8.09,1.0,239.0,239.0,2.0,37.2,3.0,...,6.2,6.55,0.3,40.55,2.5,1.25,4.0,0.5477777777777778,21.0,29.0


In [21]:
df = df.select([when(col(c)<=feature_quantiles[c][0],col(c)).otherwise(None).alias(c) for c in df.columns if c not in ['trip_itinerary','tpep_pickup_datetime','tpep_dropoff_datetime','store_and_fwd_flag']]).dropna()

#### Creating time related features

In [22]:
df = df.withColumn('trip_itinerary', concat(col('PULocationID').cast("string"),lit("_"),col('DOLocationID').cast("string")))

In [23]:
df_mean = df.groupBy('trip_itinerary').mean('trip_duration')
df_mean.show()

+--------------+-------------------+
|trip_itinerary| avg(trip_duration)|
+--------------+-------------------+
|       230_143|0.17354166666666673|
|        48_148|  0.393946644664466|
|       100_225| 0.4435317460317461|
|         100_4| 0.2975114155251141|
|       140_161|0.24066343669250628|
|       140_142|0.24977813723140782|
|       186_114|0.22385935075784688|
|        125_13|0.15776234567901232|
|       236_125| 0.4441798941798943|
|        141_87|0.31617614638447983|
|       125_230| 0.3204503105590062|
|        209_65| 0.1675992063492064|
|         68_41|0.37737103174603176|
|        137_82|0.37777777777777777|
|        88_162| 0.3005081300813006|
|       142_146|0.31092592592592594|
|        148_61|0.34471870604781985|
|         48_82| 0.4439409722222222|
|        88_236| 0.3850405092592593|
|        142_66|0.47291666666666665|
+--------------+-------------------+
only showing top 20 rows



In [24]:
df  = df.join(df_mean,on='trip_itinerary',how='left')

## Data Analisys

#### Measure Price Correlation with features 

In [25]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation
import matplotlib.pyplot as plt
import seaborn as sns

In [26]:
def plot_corr_matrix(df):
    vector_col = "corr_features"
    assembler = VectorAssembler(inputCols=df.columns, 
                                outputCol=vector_col)
    df_vector = assembler.transform(df).select(vector_col)
    matrix = Correlation.corr(df_vector, vector_col)
    matrix = Correlation.corr(df_vector, vector_col).collect()[0][0]
    corrmatrix = matrix.toArray().tolist()
    fig=plt.figure(figsize=(10, 10))
    ax=fig.add_subplot(111)
    ax.set_title("Correlation Matrix for Specified Attributes")
    sns.heatmap(pd.DataFrame(data=corrmatrix,columns=df.columns),xticklabels=df.columns,yticklabels=df.columns)
    plt.show()

In [27]:
cols = [c for c in df.columns if c not in ['trip_itinerary','tpep_pickup_datetime','tpep_dropoff_datetime','store_and_fwd_flag']]

In [28]:
df_corr = df.select(cols)
#df_corr.show(3)

In [29]:
#plot_corr_matrix(df_corr)

In [30]:
df.groupBy('pickup_day').count().toPandas().sort_values(by='pickup_day').set_index('pickup_day').plot()

## Model Training

In [31]:
from pyspark.ml.regression import GeneralizedLinearRegression,GeneralizedLinearRegressionModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
from pyspark.ml.linalg import Vectors

#### Trip Duration Model

In [32]:
assembler = VectorAssembler(inputCols=['pickup_day_shift','trip_distance','pickup_day','avg(trip_duration)'],
                            outputCol="features")
df_train = assembler.transform(df).select(["features",col("trip_duration").alias("label")])
df_train.show(5)

+--------------------+-------------------+
|            features|              label|
+--------------------+-------------------+
|[0.0,1.01,1.0,0.1...|0.07166666666666667|
|[0.0,3.55,1.0,0.3...|0.26861111111111113|
|[0.0,5.2,1.0,0.39...| 0.4311111111111111|
|[0.0,4.68,1.0,0.3...| 0.3136111111111111|
|[0.0,4.2,1.0,0.39...| 0.3063888888888889|
+--------------------+-------------------+
only showing top 5 rows



In [33]:
glr = GeneralizedLinearRegression(family="gaussian")

paramGrid = ParamGridBuilder().addGrid(glr.regParam, [0.1, 0.01]).build()

tvs = TrainValidationSplit(estimator=glr,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           parallelism=1,
                           seed=42,
                           trainRatio=0.8)

In [34]:
model = tvs.fit(df_train)

In [35]:
model.bestModel.summary

Coefficients:
           Feature Estimate Std Error  T Value P Value
       (Intercept)   0.0137    0.0002  70.1213  0.0000
  pickup_day_shift   0.0079    0.0000 183.4332  0.0000
     trip_distance   0.0255    0.0001 483.1267  0.0000
        pickup_day  -0.0004    0.0000 -75.4817  0.0000
avg(trip_duration)   0.6119    0.0009 707.4565  0.0000

(Dispersion parameter for gaussian family taken to be 0.0048)
   Null deviance: 22990.4339 on 1855604 degrees of freedom
Residual deviance: 8916.1886 on 1855604 degrees of freedom
AIC: -4639423.5329

In [36]:
params = [{p.name: v for p, v in m.items()} for m in model.getEstimatorParamMaps()]
pd.DataFrame.from_dict([
    {model.getEvaluator().getMetricName(): metric, **ps} 
    for ps, metric in zip(params, model.validationMetrics)
])

Unnamed: 0,rmse,regParam
0,0.075355,0.1
1,0.069256,0.01


In [39]:
model.write().overwrite().save("file:///C:\\Users\\Olist\\OneDrive\\Ambiente de Trabalho\\Projects\\ny_cab_app\\ML model Development\\model_trip_duration")

In [None]:
persistedModel = GeneralizedLinearRegressionModel.load('file:///C:\\Users\\Olist\\OneDrive\\Ambiente de Trabalho\\Projects\\ny_cab_app\\ML model Development\\model_trip_duration\\bestModel')

In [None]:
testData =   np.array([1,3,1,1])

In [None]:
persistedModel.predict(Vectors.dense(testData))

0.7095633960429458

#### Fare Amount Model

In [40]:
assembler = VectorAssembler(inputCols=['trip_distance','trip_duration'],
                            outputCol="features")
df_train_f = assembler.transform(df).select(["features",col("fare_amount").alias("label")])
df_train_f.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[1.01,0.071666666...|  5.5|
|[3.55,0.268611111...| 14.0|
|[5.2,0.4311111111...| 20.5|
|[4.68,0.313611111...| 16.5|
|[4.2,0.3063888888...| 15.5|
+--------------------+-----+
only showing top 5 rows



In [41]:
glr = GeneralizedLinearRegression(family="gaussian")
paramGrid = ParamGridBuilder().addGrid(glr.regParam, [0.1, 0.01]).build()
tvs = TrainValidationSplit(estimator=glr,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           parallelism=1,
                           seed=42,
                           trainRatio=0.8)

In [42]:
model = tvs.fit(df_train)
model.bestModel.summary

Coefficients:
           Feature Estimate Std Error  T Value P Value
       (Intercept)   0.0137    0.0002  70.1213  0.0000
  pickup_day_shift   0.0079    0.0000 183.4332  0.0000
     trip_distance   0.0255    0.0001 483.1267  0.0000
        pickup_day  -0.0004    0.0000 -75.4817  0.0000
avg(trip_duration)   0.6119    0.0009 707.4565  0.0000

(Dispersion parameter for gaussian family taken to be 0.0048)
   Null deviance: 22990.4339 on 1855604 degrees of freedom
Residual deviance: 8916.1886 on 1855604 degrees of freedom
AIC: -4639423.5329

In [43]:
params = [{p.name: v for p, v in m.items()} for m in model.getEstimatorParamMaps()]
pd.DataFrame.from_dict([
    {model.getEvaluator().getMetricName(): metric, **ps} 
    for ps, metric in zip(params, model.validationMetrics)
])

Unnamed: 0,rmse,regParam
0,0.075355,0.1
1,0.069256,0.01


In [44]:
model.write().save("file:///C:\\Users\\Olist\\OneDrive\\Ambiente de Trabalho\\Projects\\ny_cab_app\\ML model Development\\model_fare_amount")

In [45]:
spark.stop()