### Description
Imagine that you are an NYC taxi fleet manager. At each 15 minutes, you goal is to make sure your company has enough cars for very big spikes in demand across the city (like above 90 percentile). If you detect some very big spike in a specific area, you coordinate with the cars in the neighbourhood to go there. Here, let's assume that area 1 is the only truly important for you. The dataset includes number of pickups in Zone 1 and its neighbouring zones alongside their first and second lagged time. Weather information, day time and week time are also included.
### Aim: 
At each 15 minutes time interval, predict whether the next time interval will have a demand spike ("stress"). We want to compare 4 different classifiers including logistic regression, Decision tree, Random forest and Gradient boosted tree.

In [2]:
import findspark
findspark.init("/home/meghdad/spark-2.4.5-bin-hadoop2.7")

In [6]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("NYCtaxiSpike").getOrCreate()
df=spark.read.csv("NYC_taxis_weather_2016_with_dummies.csv",header=True,inferSchema=True)

In [11]:
print("No. of rows and columns are: ", (df.count(),len(df.columns)))
df.printSchema()

No. of rows and columns are:  (17472, 22)
root
 |-- datetime: timestamp (nullable = true)
 |-- pickups1: integer (nullable = true)
 |-- pickups17_lag1: double (nullable = true)
 |-- pickups17_lag2: double (nullable = true)
 |-- pickups1_lag1: double (nullable = true)
 |-- pickups1_lag2: double (nullable = true)
 |-- pickups21_lag1: double (nullable = true)
 |-- pickups21_lag2: double (nullable = true)
 |-- pickups28_lag1: double (nullable = true)
 |-- pickups28_lag2: double (nullable = true)
 |-- temp: double (nullable = true)
 |-- prcp: double (nullable = true)
 |-- fog: integer (nullable = true)
 |-- rain_drizzle: integer (nullable = true)
 |-- time_of_day_afternoon: integer (nullable = true)
 |-- time_of_day_afternoon rush: integer (nullable = true)
 |-- time_of_day_evening: integer (nullable = true)
 |-- time_of_day_lunch time: integer (nullable = true)
 |-- time_of_day_morning: integer (nullable = true)
 |-- time_of_day_morning rush: integer (nullable = true)
 |-- time_of_day_nigh

In [34]:
## use approxquantile method which implements Greenwald-Khanna-algorithm
## 0.001 is a relative error. the lower the number the more accurate results 
# and more expensive computation
stress_treshold=df.approxQuantile("pickups1",[0.90],0.001)[0]
stress_treshold

324.0

In [40]:
## creat a new column that is 1 when it is a "stress" scenario and 0 when it is not
from pyspark.sql.functions import when, col 
df=df.withColumn("spike",when(col("pickups1")>stress_treshold,1).otherwise(0))
df.groupBy("spike").count().show()

+-----+-----+
|spike|count|
+-----+-----+
|    1| 1748|
|    0|15724|
+-----+-----+



In [57]:
from pyspark.ml.feature import VectorAssembler
features=df.columns[2:-1]
assembler=VectorAssembler(inputCols=features,outputCol="features")
final_data=assembler.transform(df).select("spike","features")
final_data.show(5)

+-----+--------------------+
|spike|            features|
+-----+--------------------+
|    0|(20,[0,1,2,3,4,5,...|
|    0|(20,[0,1,2,3,4,5,...|
|    0|(20,[0,1,2,3,4,5,...|
|    1|(20,[0,1,2,3,4,5,...|
|    1|(20,[0,1,2,3,4,5,...|
+-----+--------------------+
only showing top 5 rows



In [66]:
from pyspark.ml.classification import (LogisticRegression,DecisionTreeClassifier,
                                       RandomForestClassifier,GBTClassifier)

In [142]:
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
 

## create initial classifiers
lr=LogisticRegression(labelCol="spike",featuresCol="features")
dtc=DecisionTreeClassifier(labelCol="spike",featuresCol="features")
rfc=RandomForestClassifier(labelCol="spike",featuresCol="features")
gbt=GBTClassifier(labelCol="spike",featuresCol="features")

## evaluate models
## metricName can be set as f1,weightedPrecision,weightedRecall and accuracy
evaluator=MulticlassClassificationEvaluator(labelCol="spike",predictionCol="prediction",
                                            metricName="accuracy")

## since grid search is computationally expensive we perfom cross validation without it
## one easy solution is to provide the parameters you want to use
paramGrid_lr=ParamGridBuilder().addGrid(lr.maxIter,[100]).build()
paramGrid_dtc=ParamGridBuilder().addGrid(dtc.impurity,["gini"]).build()
paramGrid_rfc=ParamGridBuilder().addGrid(rfc.numTrees,[150]).build()
paramGrid_gbt=ParamGridBuilder().addGrid(gbt.maxIter,[25]).build()

In [141]:
estimators=[lr,dtc,rfc,gbt]
paramGrids=[paramGrid_lr,paramGrid_dtc,paramGrid_rfc,paramGrid_gbt]
names=["Logistic regression","Decision tree","Random forest","Gradient boosting"]

In [143]:
for estimator, paramGrid, name in zip(estimators,paramGrids, names):
    ## create 3-fold cross validation
    cross_val=CrossValidator(estimator=estimator,
                         estimatorParamMaps=paramGrid,
                         evaluator=evaluator,
                         numFolds=3)
    cvModel=cross_val.fit(final_data)
    ## extract average metrics 
    accuracy_matrix=cvModel.avgMetrics
    ## calcualte average accuracy
    SUM=0
    for x in accuracy_matrix:
        SUM=x+SUM
    average_accuracy=SUM/len(cvModel.avgMetrics)
    print("Avarage accuracy of "+name+" model is :"+ str(average_accuracy) )     

Avarage accuracy of Logistic regression model is :0.9531042016971911
Avarage accuracy of Decision tree model is :0.9470960436429154
Avarage accuracy of Random forest model is :0.9502260030538972
Avarage accuracy of Gradient boosting model is :0.9500213721666722
