# NYC Clustering Problem

# use case explanation

We are familiar with New York City taxi dataset as we did some analytics over it to answer some questions. Now it is your turn to answer the following question Which manhattan (our interest borough) area should a Taxi driver choose to get a high tip?

for improving the usefulness of the service and address the use case better. We need to answer the question for an example
Which Manhattan area should a Taxi driver choose to get a high tip? we need to know a more precise location inside manhattan, not just neighborhood. 

An area could be any zone of arbitrary size. you need to search for small zones with elevated tip.  Each such an area is a cluster of Taxi rides with elevated tips.

In [3]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.rdd import portable_hash
from pyspark.statcounter import StatCounter
from pyspark.sql.types import StringType,IntegerType,FloatType
from pyspark.sql.functions import udf
from pyspark.sql.functions import to_timestamp, current_timestamp, col,expr,unix_timestamp,round,when,hour

import os
import json

from datetime import datetime
from matplotlib import pyplot as plt

In [4]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Taxi")\
        .config("spark.driver.memory", "4g")\
        .config("spark.driver.cores", "4")\
        .getOrCreate()
sc=spark.sparkContext

# Use Schemas below for reading the two dataframes

In [6]:
from pyspark.sql.types import StructField,StructType,LongType,FloatType,ShortType,TimestampType,StringType

taxiRidesSchema = StructType([StructField("rideId", LongType()), StructField("isStart", StringType()),\
                             StructField("endTime", TimestampType()), StructField("startTime", TimestampType()),\
                             StructField("startLon", FloatType()), StructField("startLat", FloatType()),\
                             StructField("endLon", FloatType()), StructField("endLat", FloatType()),\
                             StructField("passengerCnt", ShortType()), StructField("taxiId", LongType()),\
                             StructField("driverId", LongType())])
                  


In [7]:
taxiFaresSchema = StructType([StructField("rideId", LongType()), StructField("taxiId", LongType()),\
                                 StructField("driverId", LongType()), StructField("startTime", TimestampType()),\
                                 StructField("paymentType", StringType()), StructField("tip", FloatType()),\
                                 StructField("tolls", FloatType()), StructField("totalFare", FloatType())])
      

# Reading the datasets

In [9]:
#Taxi Ride
dfR = spark.read.option("header", True).option("inferSchema",True).schema(taxiRidesSchema).csv("/FileStore/tables/nycTaxiRides.csv")
dfR.show(1)

In [10]:
#Taxi fare
df = spark.read.option("header", True).option("inferSchema",True).schema(taxiFaresSchema).csv("/FileStore/tables/nycTaxiFares.csv")
df.show(1) 

# Preparation and the Model

In [12]:
from pyspark.sql.functions import *
dfR1=dfR.withColumn("year", year(col("startTime"))).withColumn("month", month(col("startTime"))).withColumn("day", dayofmonth(col("startTime")))

In [13]:
dff= df.withColumn("year",year(col("startTime"))).withColumn("month",month(col("startTime"))).withColumn("day", dayofmonth(col("startTime")))

In [14]:
dfRide =dfR1.select("rideId","endTime","startLon","startLat","endLon", "endLat")

In [15]:
dff =dff.select(col("rideId").alias("rideId_fares"), "startTime", "tip")

In [16]:
lonEast = -73.887
lonWest = -74.037
latNorth = 40.899
latSouth = 40.695

In [17]:
dfRide=dfRide.filter(col("startLon") >= lonWest)
dfRide=dfRide.filter(col("startLon") <= lonEast)
dfRide=dfRide.filter(col("startLat")  >= latSouth)
dfRide=dfRide.filter(col("startLat") <= latNorth)
dfRide=dfRide.filter(col("endLon") >= lonWest)
dfRide=dfRide.filter(col("endLon") <= lonEast)
dfRide=dfRide.filter(col("endLat") >= latSouth)
dfRide=dfRide.filter(col("endLat") <= latNorth)
dfRide=dfRide.filter(col("isStart") == "END")

In [18]:
dfRide.show(5)

In [19]:
dfRide.count()

In [20]:
dfRidesManhattan =dfRide.join(dff, expr ("""rideId_fares = rideId AND endTime > startTime AND endTime <= startTime + interval 2 hours"""))

In [21]:
dfRidesManhattan=dfRidesManhattan.filter(col("tip") > 0)

In [22]:
dfRidesManhattan=dfRidesManhattan.withColumn("startHour", hour(col("startTime"))).drop(col("rideId_fares"))

In [23]:
dfRidesManhattan.show(5)

In [24]:
from pyspark.sql.types import DoubleType
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql import functions as F
from pyspark.sql.functions import udf

In [25]:
vectCol=udf(lambda tip : Vectors.dense(tip), VectorUDT())

In [26]:
 dfRidesManhattan=dfRidesManhattan.withColumn("tipVect", vectCol(dfRidesManhattan["tip"]))

In [27]:
dfRidesManhattan.show(3)

In [28]:
from pyspark.ml.feature import MinMaxScaler
tipScaler =MinMaxScaler().setInputCol("tipVect").setOutputCol("tipScaled").setMin(0).setMax(1)

In [29]:
from pyspark.sql.functions import *
import numpy as np
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler, VectorSlicer
from pyspark.ml import Pipeline

In [30]:
featuresAssembler=VectorAssembler().setInputCols(['startLon', 'startLat', 'tip']).setOutputCol("features")

In [31]:
from pyspark.ml.clustering import KMeans,KMeansModel
from pyspark.ml.evaluation import ClusteringEvaluator

Ks1=[]
for i in range(7,16):
  kmeansEstimator=KMeans().setK(i).setSeed(1)
  prepareKmeansPipeline=Pipeline().setStages([tipScaler, featuresAssembler, kmeansEstimator])
  model_1 = prepareKmeansPipeline.fit(dfRidesManhattan)
  predictions_1 = model_1.transform(dfRidesManhattan)
  evaluator=ClusteringEvaluator()
  evaluator_1 = evaluator.evaluate(predictions_1)
  Ks1.append([i,evaluator_1])
Ks1 

In [32]:
res = sorted(Ks1, key = lambda i: i[1], reverse = True)[0][0]
 
# printing result 
print ("The best k for knn is : " , res)

In [33]:
from pyspark.ml.clustering import KMeans,KMeansModel
kmeansEstimator=KMeans().setK(12).setSeed(1)
prepareKmeansPipeline=Pipeline().setStages([tipScaler, featuresAssembler, kmeansEstimator])
model_1 = prepareKmeansPipeline.fit(dfRidesManhattan)

In [34]:
centers = model_1.stages[2].clusterCenters()
for center in centers:
  print(center)

In [35]:
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator
Ks2=[]
for i in range(7,16):
  BisectingKMeansEstimator=BisectingKMeans().setK(i).setSeed(1)
  prepareKmeansPipeline=Pipeline().setStages([tipScaler, featuresAssembler, BisectingKMeansEstimator])
  model_2 = prepareKmeansPipeline.fit(dfRidesManhattan)
  predictions_2 = model_2.transform(dfRidesManhattan)
  evaluator=ClusteringEvaluator()
  evaluator_2 = evaluator.evaluate(predictions_2)
  Ks2.append([i,evaluator_2])
Ks2

In [36]:
res = sorted(Ks2, key = lambda i: i[1], reverse = True)[0][0]
 
# printing result 
print ("The best k for BisectingKMeans is : " , res)

In [37]:
from pyspark.ml.clustering import BisectingKMeans
BisectingKMeansEstimator=BisectingKMeans().setK(8).setSeed(1)
prepareKmeansPipeline=Pipeline().setStages([tipScaler, featuresAssembler, BisectingKMeansEstimator])
model_2 = prepareKmeansPipeline.fit(dfRidesManhattan)

In [38]:
from pyspark.ml.clustering import GaussianMixture

Ks3=[]
for i in range(7,16):
  GaussianMixtureEstimator=GaussianMixture().setK(i).setSeed(1)
  prepareKmeansPipeline=Pipeline().setStages([tipScaler, featuresAssembler, GaussianMixtureEstimator])
  model_3 = prepareKmeansPipeline.fit(dfRidesManhattan)
  predictions_3 = model_3.transform(dfRidesManhattan)
  evaluator=ClusteringEvaluator()
  evaluator_3 = evaluator.evaluate(predictions_3)
  Ks3.append([i,evaluator_2])
Ks3

In [39]:
res = sorted(Ks3, key = lambda i: i[1], reverse = True)[0][0]
 
# printing result 
print ("The best k for GaussianMixture is : " , res)

In [40]:
from pyspark.ml.clustering import GaussianMixture
GaussianMixtureEstimator=GaussianMixture().setK(7).setSeed(1)
prepareKmeansPipeline=Pipeline().setStages([tipScaler, featuresAssembler, GaussianMixtureEstimator])
model_3 = prepareKmeansPipeline.fit(dfRidesManhattan)


# Evaluating clusters
use ClusteringEvaluator

In [42]:
from pyspark.ml.evaluation import ClusteringEvaluator
predictions_1 = model_1.transform(dfRidesManhattan)
evaluator=ClusteringEvaluator()
evaluator_1 = evaluator.evaluate(predictions_1)
evaluator_1

In [43]:
#Computing cluster's variances
from pyspark.ml.stat import Summarizer

In [44]:
variances=predictions_1.groupBy(col("prediction")).agg(Summarizer.variance(col("features")).alias("clustersVar"))

In [45]:
type(variances)

In [46]:
from pyspark.sql.types import FloatType
from pyspark.sql.types import DoubleType
from pyspark.ml.linalg import Vectors, VectorUDT

firstelement=udf(lambda v:float(v[2]),FloatType())
t=variances.select("prediction",firstelement('clustersVar').alias("tipvar"))

In [47]:
t.show()

In [48]:
m=t.agg(max("tipvar")).collect()[0][0]
Zone_Elavated_Tip=t.filter(col("tipvar")==m).select("prediction").collect()[0][0]
print("The zone with elevated tip is : ",Zone_Elavated_Tip)

In [49]:
predictions_2 = model_2.transform(dfRidesManhattan)
evaluator=ClusteringEvaluator()
evaluator_2 = evaluator.evaluate(predictions_2)
evaluator_2

In [50]:
variances_2=predictions_2.groupBy(col("prediction")).agg(Summarizer.variance(col("features")).alias("clustersVar"))
firstelement=udf(lambda v:float(v[2]),FloatType())
ff=variances_2.select("prediction",firstelement('clustersVar').alias("tipvar"))

In [51]:
ff.show()

In [52]:
mm=ff.agg(max("tipvar")).collect()[0][0]
Zone_Elavated_Tip2=ff.filter(col("tipvar")==mm).select("prediction").collect()[0][0]
print("The zone with elevated tip is : ",Zone_Elavated_Tip2)

In [53]:
from pyspark.ml.evaluation import ClusteringEvaluator
predictions_3 = model_3.transform(dfRidesManhattan)
evaluator=ClusteringEvaluator()
evaluator_3 = evaluator.evaluate(predictions_3)
evaluator_3

In [54]:
variances_3=predictions_3.groupBy(col("prediction")).agg(Summarizer.variance(col("features")).alias("clustersVar"))
firstelement=udf(lambda v:float(v[2]),FloatType())
kk=variances_3.select("prediction",firstelement('clustersVar').alias("tipvar"))

In [55]:
kk.show()

In [56]:
mmm=kk.agg(max("tipvar")).collect()[0][0]
Zone_Elavated_Tip3=kk.filter(col("tipvar")==mmm).select("prediction").collect()[0][0]
print("The zone with elevated tip is : ",Zone_Elavated_Tip3)