<a href="https://colab.research.google.com/github/fatma-ds/Chicago_taxi2016/blob/main/Chicago_taxi.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Chicago Taxi Rides_2016

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
!pip install findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import numpy as np
import pandas as pd 
import findspark 
from pyspark.sql import SparkSession 


#Load Module
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import *



In [None]:

findspark.init()

spark = SparkSession\
    .builder\
    .appName("chicago_taxi_trips")\
    .getOrCreate()
#Creat Spark Session
spark=SparkSession.builder.master('local').appName('Regression taxi trips').getOrCreate()

#Load dataset
data=spark.read.csv('/content/chicago_taxi_trips_2016_*.csv',inferSchema=True, header=True)
data.show(5)

+-------+--------------------+-------------------+------------+----------+-------------------+--------------------+---------------------+----------------------+-----+----+-----+------+----------+------------+-------+---------------+----------------+----------------+-----------------+
|taxi_id|trip_start_timestamp| trip_end_timestamp|trip_seconds|trip_miles|pickup_census_tract|dropoff_census_tract|pickup_community_area|dropoff_community_area| fare|tips|tolls|extras|trip_total|payment_type|company|pickup_latitude|pickup_longitude|dropoff_latitude|dropoff_longitude|
+-------+--------------------+-------------------+------------+----------+-------------------+--------------------+---------------------+----------------------+-----+----+-----+------+----------+------------+-------+---------------+----------------+----------------+-----------------+
|     85| 2016-01-13 06:15:00|2016-01-13 06:15:00|         180|       0.4|               null|                null|                   24|        

In [None]:
#checking the numbers of rows
# we have about 20 M records 
rows = data.count()
print(f"DataFrame Rows count : {rows}")

DataFrame Rows count : 19866157


## drop missing values 

In [None]:
#Dropped "pickup_census_tract" column because it is all null
#dropping "pickup_census_tract" column improved model performance
df = data.drop("pickup_census_tract")
df = df.na.drop()
df.count()

7564042

In [None]:
#first select features and label
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

df1=df.select("fare","trip_seconds","trip_miles")
df1.show(5)

+-----+------------+----------+
| fare|trip_seconds|trip_miles|
+-----+------------+----------+
|  5.0|         180|       0.0|
|  7.0|         480|       1.3|
| 7.25|         420|       0.0|
| 6.25|         420|       0.0|
|10.75|         720|       0.0|
+-----+------------+----------+
only showing top 5 rows



In [None]:
df1.printSchema()

root
 |-- fare: double (nullable = true)
 |-- trip_seconds: integer (nullable = true)
 |-- trip_miles: double (nullable = true)



In [None]:
# converting fare and trip_miles to integer
df1 = df1.withColumn("fare", df1["fare"].cast(IntegerType()))
df1 = df1.withColumn("trip_miles", df1["trip_miles"].cast(IntegerType()))
df1.show(5)

+----+------------+----------+
|fare|trip_seconds|trip_miles|
+----+------------+----------+
|   5|         180|         0|
|   7|         480|         1|
|   7|         420|         0|
|   6|         420|         0|
|  10|         720|         0|
+----+------------+----------+
only showing top 5 rows



In [None]:
df1.printSchema()

root
 |-- fare: integer (nullable = true)
 |-- trip_seconds: integer (nullable = true)
 |-- trip_miles: integer (nullable = true)



In [None]:
#checking null values and drop them 
df1 = df1.na.drop()
df1

DataFrame[fare: int, trip_seconds: int, trip_miles: int]

In [None]:
df1.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
fare,7564042,11.585207221218496,11.980659225126931,0,4444
trip_seconds,7564042,739.3545963917176,715.5603996367231,0,79140
trip_miles,7564042,1.8509856238238762,4.946990067093426,0,995


In [None]:
from pyspark.ml.feature import VectorAssembler

#Assembler combine all features/predictors into a vector
assembler=VectorAssembler(inputCols=("trip_seconds","trip_miles"),
                         outputCol=('features'))
# we transfor the data 
newdata=assembler.transform(df1).select(col('features'),(col('fare').cast('Int').alias('label')))
newdata.show(5)

+-----------+-----+
|   features|label|
+-----------+-----+
|[180.0,0.0]|    5|
|[480.0,1.0]|    7|
|[420.0,0.0]|    7|
|[420.0,0.0]|    6|
|[720.0,0.0]|   10|
+-----------+-----+
only showing top 5 rows



In [None]:
#Split dataset into 80/20

splitdata=newdata.randomSplit([0.8,0.2])
train=splitdata[0]
test=splitdata[1]

print('train:\n', train.count(), '\ntest:\n',test.count())

train:
 6050601 
test:
 1513441


## Build a linear regression with elastic net Regularizers to  forecast fare using trip_seconds, trip_miles

In [None]:
#now we will build the linear regression model
from pyspark.ml.regression import LinearRegression
# Crearte a linear regression model object
lrmodel = LinearRegression(
    labelCol="label",featuresCol="features", maxIter=10, regParam=0.1, elasticNetParam=0.8)
#Train our model on the training dataset
model = lrmodel.fit(train)
print ("Model Trained")

Model Trained


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
# forecast the Test dataset using transform method
predictions1=model.transform(test)
predictions1.show(5)
# setup the metric to use, such as root meaned squared error, which is very popular for regression project
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator2 = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="r2")

rmse = evaluator.evaluate(predictions1)
r2 = evaluator2.evaluate(predictions1)
print ("Root Mean Square Error (RMSE):", rmse)
print ("R2 :", r2)

+---------+-----+------------------+
| features|label|        prediction|
+---------+-----+------------------+
|(2,[],[])|    0|1.7249579238275843|
|(2,[],[])|    0|1.7249579238275843|
|(2,[],[])|    0|1.7249579238275843|
|(2,[],[])|    0|1.7249579238275843|
|(2,[],[])|    0|1.7249579238275843|
+---------+-----+------------------+
only showing top 5 rows

Root Mean Square Error (RMSE): 5.25491120474526
R2 : 0.7978889644913532


## Build a simple tree model

In [None]:

from pyspark.ml import Pipeline
#import the libary for tree regression
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer


# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(newdata)


dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

# Chain indexer and tree in a Pipeline, then the data was process by the feature Indexer first,
# then the processed data will be passed to the tree model
pipeline = Pipeline(stages=[featureIndexer, dt])

# Train model, this also runs the indexer.
model2 = pipeline.fit(train)


In [None]:
# Make predictions on the test dataset
predictions = model2.transform(test)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Evaluate our model on the test dataset
# Select (prediction, true label) and compute test error using root mean squared error for regression
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator2 = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="r2")
# evaluate the model performance on the predicted dataset
rmse = evaluator.evaluate(predictions)
r2 = evaluator2.evaluate(predictions)
print ("Root Mean Square Error (RMSE):", rmse)
print ("R2 :", r2)

treeModel = model2.stages[1]
# summary only
print(treeModel)

+-----------------+-----+---------+
|       prediction|label| features|
+-----------------+-----+---------+
|4.211091781238388|    0|(2,[],[])|
|4.211091781238388|    0|(2,[],[])|
|4.211091781238388|    0|(2,[],[])|
|4.211091781238388|    0|(2,[],[])|
|4.211091781238388|    0|(2,[],[])|
+-----------------+-----+---------+
only showing top 5 rows

Root Mean Square Error (RMSE): 3.775949971166071
R2 : 0.8956453690666406
DecisionTreeRegressionModel: uid=DecisionTreeRegressor_22b1f98c5546, depth=5, numNodes=63, numFeatures=2


##  Build a random forest model

In [None]:
from pyspark.ml.regression import RandomForestRegressor

# Train a RandomForest model.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(newdata)

rf = RandomForestRegressor(featuresCol="indexedFeatures")

# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])

# Train model.  This also runs the indexer.
model3 = pipeline.fit(train)


In [None]:

# Make predictions on the test dataset
predictions = model3.transform(test)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
# evaluate the model performance on the test dataset using root mean squared error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator2 = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="r2")
# evaluate the model performance on the predicted dataset
rmse = evaluator.evaluate(predictions)
r2 = evaluator2.evaluate(predictions)
print ("Root Mean Square Error (RMSE):", rmse)
print ("R2 :", r2)


rfModel = model3.stages[1]
print(rfModel)  # summary only

+-----------------+-----+---------+
|       prediction|label| features|
+-----------------+-----+---------+
|5.447869659183243|    0|(2,[],[])|
|5.447869659183243|    0|(2,[],[])|
|5.447869659183243|    0|(2,[],[])|
|5.447869659183243|    0|(2,[],[])|
|5.447869659183243|    0|(2,[],[])|
+-----------------+-----+---------+
only showing top 5 rows

Root Mean Square Error (RMSE): 3.7364716003591947
R2 : 0.8978160622195315
RandomForestRegressionModel: uid=RandomForestRegressor_d29bf69184b9, numTrees=20, numFeatures=2


## Build a Gradient-Boosted Tree model

In [None]:
from pyspark.ml.regression import GBTRegressor
# Train a GBT model.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(newdata)

gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)

# Chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, gbt])

# Train model.  This also runs the indexer first, then pass the processed data to the gbt model.
model4 = pipeline.fit(train)


In [None]:

# Make predictions on the test dataset
predictions = model4.transform(test)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error using rmse for regression
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator2 = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="r2")
# evaluate the model performance on the predicted dataset
rmse = evaluator.evaluate(predictions)
r2 = evaluator2.evaluate(predictions)
print ("Root Mean Square Error (RMSE):", rmse)
print ("R2 :", r2)


# our pipeline has two steps/stages stages=[featureIndexer, gbt]
# stages[0]= featureIndexer; stages[1] = gbt
gbtModel = model4.stages[1]
print(gbtModel)  # summary only

+-----------------+-----+---------+
|       prediction|label| features|
+-----------------+-----+---------+
|4.189686399593084|    0|(2,[],[])|
|4.189686399593084|    0|(2,[],[])|
|4.189686399593084|    0|(2,[],[])|
|4.189686399593084|    0|(2,[],[])|
|4.189686399593084|    0|(2,[],[])|
+-----------------+-----+---------+
only showing top 5 rows

Root Mean Square Error (RMSE): 3.5639460686984554
R2 : 0.9070345640302035
GBTRegressionModel: uid=GBTRegressor_e3f032210b20, numTrees=10, numFeatures=2


 ### perform hyper parameter tuning on the  Gradient-Boosted Tree model since it gave the lowest RMSE score and highest R2

In [None]:
# we first need to build a parameter grid using ParamGridBuilder
# then select the optimal parameter using CrossValidator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Train the Gradient-boosted model.
gb = GBTRegressor(featuresCol="features" ,labelCol="label")
gbparamGrid = (ParamGridBuilder()
             .addGrid(gb.maxDepth, [2])
             .addGrid(gb.maxBins, [5])
             .addGrid(gb.maxIter, [10])
             .build())


gbevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")

#set the estimator to be the best model above, and paragrid, the CrossValidator, the number of fold
gbcv = CrossValidator(estimator = gb, estimatorParamMaps= gbparamGrid,evaluator = gbevaluator,numFolds = 5)



# fit on the training dataset
gbcvModel = gbcv.fit(train)


#evaluate on the test dataset
gbpredictions = gbcvModel.transform(test)




print('RMSE:', gbevaluator.evaluate(gbpredictions))





RMSE: 6.422820798040719
