<h1>Batch processing of yellow taxi trip record data from New York City (NYC) Taxi and Limousine Commission (TLC)</h1>
<h2>Introduction</h2>
<p align="justify"> Each month, Taxi and Limousine Commission in New York City publishes a CSV file containing a record	of every journey undertaken in a Yellow Taxi during that month. Data in those CSV files were collected on behalf of TLC by two technology providers authorized under a programme called Taxicab and Livery Passenger Enhancement Programme (TPEP).</p>

Following analysis is based on data about trips and fares for New York Yellow Taxis collected in 2016. Data is stored at AWS S3 and available to download from here: <br/>
https://s3-eu-west-1.amazonaws.com/clo34/yell2016.csv 

More information about NYC Taxi and Limousine Commission and Yellow Taxies can be found using following link. 
<br/>http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml

<p>Architectures used: An Apache Spark cluster running on AWS EMR using Yarn as resource manager.<br/>
Programming language used: Scala </p>

<h2>Analysis</h2>

In [2]:
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.feature.{StandardScaler, VectorAssembler}

import java.sql.Timestamp
import java.util.Calendar
import java.text.SimpleDateFormat

In [5]:
// Creating an org.apache.spark.sql.SparkSession object.
val session = SparkSession.builder.appName("NYCYellowCabTaxi").getOrCreate()

Following schema was used to process trip record dataset. 

<table align="center">
    <tr><th>Column name</th><th>Data description</th></tr>
    <tr><td>VendorID</td><td>A code indicating the TPEP provider that provided the record.
1= Creative Mobile Technologies, LLC; 2= VeriFone Inc.</td></tr>
    <tr><td>tpep_pickup_datetime</td><td>The date and time when the meter was engaged</td></tr>
    <tr><td>tpep_dropoff_datetime</td><td>The date and time when the meter was disengaged</td></tr>
    <tr><td>passenger_count</td><td>The number of passengers in the vehicle. This is a driver-entered value.</td></tr>
    <tr><td>trip_distance</td><td>The elapsed trip distance in miles reported by the taximeter</td></tr>
    <tr><td>pickup_longitude</td><td>Longitude in which the taximeter was engaged</td></tr>
    <tr><td>pickup_latitude</td><td>Latitude in which the taximeter was engaged</td></tr>
    <tr><td>RatecodeID</td><td>The final rate code in effect at the end of the trip.
1= Standard rate
2=JFK
3=Newark
4=Nassau or Westchester
5=Negotiated fare
6=Group ride</td></tr>
    <tr><td>store_and_fwd_flag</td><td>This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka “store and forward,” because the vehicle did not have a connection to the server.
Y= store and forward trip
N= not a store and forward trip</td></tr>
    <tr><td>dropoff_longitude</td><td>Longitude in which the taximeter was disengaged</td></tr>
    <tr><td>dropoff_latitude</td><td>Latitude in which the taximeter was disengaged</td></tr>
    <tr><td>payment_type</td><td>A numeric code signifying how the passenger paid for the trip.
1= Credit card
2= Cash
3= No charge
4= Dispute
5= Unknown
6= Voided trip</td></tr>
    <tr><td>fare_amount</td><td>The time-and-distance fare calculated by the meter</td></tr>
    <tr><td>extra</td><td>Miscellaneous extras and surcharges, such as rush hour and overnight charges.</td></tr>
    <tr><td>mta_tax</td><td> &#36; 0.50 MTA tax that is automatically triggered based on the metered
rate in use.</td></tr>
    <tr><td>tip_amount</td><td>Tip amount – This field is automatically populated for credit card
tips. Cash tips are not included.</td></tr>
    <tr><td>tolls_amount</td><td>Total amount of all tolls paid in trip</td></tr>
    <tr><td>improvement_surcharge</td><td>&#36; 0.30 improvement surcharge assessed trips at the flag drop. The
improvement surcharge began being levied in 2015.</td></tr>
    <tr><td>total_amount</td><td>The total amount charged to passengers. Does not include cash tips.</td></tr>
</table>
<br/>

Following PDF file further ellaborates some of the columns in the schema. <br/>

http://www.nyc.gov/html/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf

In [8]:
val journeySchema = StructType(Array(
    StructField("VendorID", IntegerType, true), StructField("tpep_pickup_datetime", TimestampType, false),
    StructField("tpep_dropoff_datetime", TimestampType, false), StructField("passenger_count", IntegerType, true),
    StructField("trip_distance", DoubleType, false), StructField("pickup_longitude", DoubleType, false),
    StructField("pickup_latitude", DoubleType, false), StructField("RatecodeID", IntegerType, true),
    StructField("store_and_fwd_flag", StringType, true), StructField("dropoff_longitude", DoubleType, false),
    StructField("dropoff_latitude", DoubleType, false), StructField("payment_type", IntegerType, true),
    StructField("fare_amount", DoubleType, false), StructField("extra", DoubleType, true),
    StructField("mta_tax", DoubleType, true), StructField("tip_amount", DoubleType, true),
    StructField("tolls_amount", DoubleType, true), StructField("improvement_surcharge", DoubleType, true),
    StructField("total_amount", DoubleType, true)
))

// Creating an org.apache.spark.sql.DataFrame object
// A new coloumn generating journey time in seconds was added to DataFrame. 
val taxiDf = session.read.format("csv").schema(journeySchema).option("header", "true").
    load("s3a://clo34/yell2016.csv").
    withColumn("journey_time", col("tpep_dropoff_datetime").cast("long") - col("tpep_pickup_datetime").cast("long")) 

In [9]:
// displaying first row
taxiDf.head(1)

res11: Array[org.apache.spark.sql.Row] = Array([2,2016-01-01 00:00:00.0,2016-01-01 00:00:00.0,2,1.1,-73.99037170410156,40.73469543457031,1,N,-73.98184204101562,40.73240661621094,2,7.5,0.5,0.5,0.0,0.0,0.3,8.8,0])


<p><b>Defining a function to check whether trip_distance is greater than or equal to geo distance :</b></p>
<p>Geo distance is the distance measured along the surface of the earth.</p>
<p>Assumption: Entries where pickup_longitude, pickup_latitude are equal to dropoff_longitude, dropoff_latitude respectively were considered as valid records, because there could be journies where people started from a location and came back to same location. In these cases trip_distance was not validated against geo distance.</p>

In [13]:
/** Calculates geo distance between given two locations.
*
*  @param pickup_latitude Latitude in which the taximeter was engaged
*  @param pickup_longitude Longitude in which the taximeter was engaged
*  @param dropoff_latitude Latitude in which the taximeter was disengaged
*  @param dropoff_longitude Longitude in which the taximeter was disengaged
*
*  @return geo distance between two locations in miles
*/
def getDistance(pickup_latitude: Double, pickup_longitude: Double, dropoff_latitude: Double,
                  dropoff_longitude: Double): Int = {
    val sinLat = Math.sin(Math.toRadians(pickup_latitude - dropoff_latitude) / 2)
    val sinLng = Math.sin(Math.toRadians(pickup_longitude - dropoff_longitude) / 2)
    val a = sinLat * sinLat + (Math.cos(Math.toRadians(pickup_latitude)) * Math.cos(Math.toRadians(dropoff_latitude))*
      sinLng * sinLng)
    val c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a))
    (3958.756 * c).toInt
}


/** Decides whether a record is valid or not based on trip_distance >= geo distance for cases where starting point
*   and ending point are not the same.
*
*  @param pickup_latitude Latitude in which the taximeter was engaged
*  @param pickup_longitude Longitude in which the taximeter was engaged
*  @param dropoff_latitude Latitude in which the taximeter was disengaged
*  @param dropoff_longitude Longitude in which the taximeter was disengaged
*  @param trip_distance elapsed trip distance in miles 
*
*  @return a boolean value indicating whether the condition is satisfied or not. 
*/
def filterTrips(pickup_latitude: Double, pickup_longitude: Double, dropoff_latitude: Double,
                  dropoff_longitude: Double, trip_distance: Double): Boolean = {
    ((pickup_longitude != dropoff_longitude) && (pickup_latitude != dropoff_latitude) &&
    (trip_distance >= getDistance(pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude))) ||
    ((pickup_longitude == dropoff_longitude) && (pickup_latitude == dropoff_latitude))
}

val filterTripsUDF = udf(filterTrips _)

<p><b>Defining a function to take 30 minute periods throughout the day(from midnight to midnight) and decide to which 30 minute period a journey belongs to: </b></p>
<p>Where a trip crosses a boundary (where the drop off is in a different period to the pickup), trip was assigned to the period which it is in more. If the trip exactly straddles two periods then it was assigned to the earlier period. If a trip crosses more than two boundaries, it was assigned to the period where the midpoint of the journey happened.</p>

In [15]:
// a helper function used by getPeriodStartTime function
def getStartTime(timestamp: Timestamp): Timestamp = {
    val calendar = Calendar.getInstance()
    calendar.setTime(timestamp)
    val minute = calendar.get(Calendar.MINUTE)

    if (minute >= 0 && minute <= 29)
      calendar.set(Calendar.MINUTE, 0)
    else if (minute >= 30 && minute <= 59)
      calendar.set(Calendar.MINUTE, 30)
    calendar.set(Calendar.SECOND, 0)
    new Timestamp(calendar.getTimeInMillis())

}

/** Calculates the start time of the time period corresponding to a journey
*
*  @param pickup The date and time when the meter was engaged
*  @param drop The date and time when the meter was disengaged
*
*  @return a Timestamp object corresponding to start time of the time period
*/

def getPeriodStartTime(pickup: Timestamp, drop: Timestamp): Timestamp = {
    val firstPeriodStartTime = getStartTime(pickup)
    val finalPeriodStartTime = getStartTime(drop)
    val finalPeriodEndTime = new Timestamp(finalPeriodStartTime.getTime() + (1000 * 60 * 30))
    val numPeriods = (finalPeriodEndTime.getTime() - firstPeriodStartTime.getTime()) / (1000 * 60 * 30)
    if (numPeriods == 1)
      firstPeriodStartTime

    else if (numPeriods == 2) {
      val firstDuration = 30 * 60 * 1000 - (pickup.getTime - firstPeriodStartTime.getTime) //
      val secondDuration = (drop.getTime - finalPeriodStartTime.getTime)

      if ((firstDuration == secondDuration) || (firstDuration > secondDuration))
        firstPeriodStartTime
      else
        finalPeriodStartTime
    }

    else if (numPeriods > 2) {
      val midpoint = new Timestamp(pickup.getTime + (drop.getTime - pickup.getTime) / 2)
      getStartTime(midpoint)
    }
    else null
}

/** Calculates the time period corresponding to a journey in HH:mm-HH:mm format.
*
*  @param pickup The date and time when the meter was engaged
*  @param drop The date and time when the meter was disengaged
*
*  @return a String object depicting the time period in the format HH:mm-HH:mm
*/

def getPeriod(pickup: Timestamp, drop: Timestamp): String = {
    val timeFormat = new SimpleDateFormat("HH:mm")
    val startTime = getPeriodStartTime(pickup, drop)
    timeFormat.format(startTime) + "-" + timeFormat.format(new Timestamp(startTime.getTime() + (1000 * 60 * 29)))
}

val getPeriodUDF = udf(getPeriod _)

<p><b>Clensing the dataset:</b></p>
<table align="right">
    <tr><th>Records satisfying following criteria should be dropped.</th></tr>
    <tr><td>payment_type is six: If payment type is six it is a voided trip.</td></tr>
    <tr><td>At least one of tpep_pickup_datetime, tpep_dropoff_datetime, trip_distance,  
pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude, fare_amount is null: These are minimum columns needed to conduct analysis. Without proper values for these columns, a productive analysis cannot be conducted. </td></tr>
    <tr><td>tpep_dropoff_datetime is smaller than tpep_pickup_datetime: This is impossible to happen.</td></tr>
    <tr><td>trip_distance with zero or negative value</td></tr>
    <tr><td>fare_amount with zero or negative value</td></tr>
    <tr><td>tip_amount with a negative value</td></tr>
    <tr><td>journey time less than 300s (5 min): It was assumed that journey time should be at least 5 minutes to travel by a taxi.</td></tr>
    <tr><td>trip_distance less than 0.1 miles. Distances less than 0.1 miles were assumed as walking distance, and further assumed that it is unnecessary to use a taxi for such a short distance.</td></tr> 
    <tr><td>pickup_latitude or pickup_longitude or dropoff_latitude or dropoff_longitude equal to zero. Quick and dirty inspection of data revealed lot of records with zero for those columns, with very short trips. There records seem to be invalid. </td></tr>
    <tr><td>trip_distance lower than geo distance between starting and ending locations, where those locations are not the same.</td></tr>
</table>
<h1></h1>
<table align="right">
    <tr><th>Following columns were added.</th></tr>
    <tr><td>30 minute period a journey belongs to was calculated using getPeriodUDF function.</td></tr>
    <tr><td>speed for each journey in mph</td></tr>
    <tr><td>tip per distance for each journey</td></tr>
</table>

In [17]:
val clensedTaxiDf = taxiDf.filter("payment_type != 6").na.drop(Seq("tpep_pickup_datetime", "tpep_dropoff_datetime", 
    "trip_distance","pickup_latitude","pickup_longitude","dropoff_latitude","dropoff_longitude","fare_amount")).
    filter("tpep_dropoff_datetime > tpep_pickup_datetime").
    filter("trip_distance > 0").
    filter("fare_amount > 0").
    filter("tip_amount >= 0").
    filter("journey_time >= 300").
    filter("trip_distance >= 0.1").
    filter("pickup_latitude != 0 and pickup_longitude != 0 and dropoff_latitude !=0  and dropoff_longitude != 0").
    filter(filterTripsUDF(col("pickup_latitude"), col("pickup_longitude"), col("dropoff_latitude"), col("dropoff_longitude"), col("trip_distance"))).
    select("tpep_pickup_datetime", "tpep_dropoff_datetime", "trip_distance", "pickup_longitude", "pickup_latitude",
      "dropoff_longitude", "dropoff_latitude", "fare_amount", "tip_amount", "journey_time").
    withColumn("time_period", getPeriodUDF(col("tpep_pickup_datetime"), col("tpep_dropoff_datetime"))).
    withColumn("speed_mph", format_number(col("trip_distance") * 3600.0 / col("journey_time"), 2).cast("double")).
    withColumn("tip_per_distance", format_number(col("tip_amount") / col("trip_distance"), 2).cast("double")).
    drop("journey_time")

In [18]:
clensedTaxiDf.columns

res23: Array[String] = Array(tpep_pickup_datetime, tpep_dropoff_datetime, trip_distance, pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, fare_amount, tip_amount, time_period, speed_mph, tip_per_distance)


In [19]:
clensedTaxiDf.persist()

res24: clensedTaxiDf.type = [tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp ... 10 more fields]


<p><b> Finding average speed of taxis during each 30 minute period, across the whole year:</b></p>


In [21]:
val groupedByTimePeriod= clensedTaxiDf.groupBy("time_period")

In [22]:
groupedByTimePeriod.avg("speed_mph").withColumn("avg(speed_mph)",format_number(col("avg(speed_mph)"),2).
                                                cast("double")).orderBy("time_period").show(48)

+-----------+--------------+
|time_period|avg(speed_mph)|
+-----------+--------------+
|00:00-00:29|         14.64|
|00:30-00:59|          14.9|
|01:00-01:29|         15.22|
|01:30-01:59|         15.52|
|02:00-02:29|         15.68|
|02:30-02:59|         15.95|
|03:00-03:29|         16.42|
|03:30-03:59|         17.02|
|04:00-04:29|         18.06|
|04:30-04:59|         20.16|
|05:00-05:29|         21.71|
|05:30-05:59|         21.02|
|06:00-06:29|         19.16|
|06:30-06:59|         16.51|
|07:00-07:29|         14.29|
|07:30-07:59|         12.12|
|08:00-08:29|         10.83|
|08:30-08:59|         10.22|
|09:00-09:29|         10.11|
|09:30-09:59|         10.13|
|10:00-10:29|         10.26|
|10:30-10:59|         10.29|
|11:00-11:29|         10.22|
|11:30-11:59|          9.99|
|12:00-12:29|          9.86|
|12:30-12:59|          9.94|
|13:00-13:29|         10.07|
|13:30-13:59|         10.07|
|14:00-14:29|          10.1|
|14:30-14:59|          9.84|
|15:00-15:29|          9.73|
|15:30-15:59| 

<p><b> Finding the maximum amount of fare earned by a driver in each 30 minute time period, across the whole year in descending order:</b></p>

In [23]:
groupedByTimePeriod.max("fare_amount").orderBy(col("max(fare_amount)").desc).show(48)

+-----------+----------------+
|time_period|max(fare_amount)|
+-----------+----------------+
|23:00-23:29|       187440.96|
|14:30-14:59|       154810.43|
|18:00-18:29|       153231.93|
|16:30-16:59|       133057.84|
|11:30-11:59|       126348.88|
|17:30-17:59|          6400.0|
|16:00-16:29|         2020.37|
|22:30-22:59|          2008.5|
|06:30-06:59|         1677.77|
|07:30-07:59|          1500.0|
|13:00-13:29|          1426.0|
|08:30-08:59|          1411.0|
|00:00-00:29|           954.0|
|19:00-19:29|           900.0|
|20:00-20:29|           893.5|
|02:00-02:29|           819.5|
|22:00-22:29|           813.0|
|19:30-19:59|           742.0|
|10:30-10:59|           683.5|
|10:00-10:29|           671.5|
|18:30-18:59|          666.61|
|15:30-15:59|           660.0|
|12:00-12:29|           636.5|
|01:00-01:29|           600.0|
|04:30-04:59|           600.0|
|04:00-04:29|           600.0|
|00:30-00:59|           574.0|
|14:00-14:29|           550.0|
|12:30-12:59|           530.0|
|23:30-2

<p><b> Finding the maximum amount of tip earned by a driver in each 30 minute time period, across the whole year in descending order:</b></p>

In [24]:
groupedByTimePeriod.max("tip_amount").orderBy(col("max(tip_amount)").desc).show(48)

+-----------+---------------+
|time_period|max(tip_amount)|
+-----------+---------------+
|04:00-04:29|         854.85|
|16:30-16:59|         744.96|
|17:00-17:29|         622.11|
|20:30-20:59|         544.44|
|10:00-10:29|         542.51|
|23:30-23:59|         520.38|
|20:00-20:29|         454.43|
|16:00-16:29|         445.32|
|21:30-21:59|          444.5|
|13:00-13:29|         444.48|
|00:00-00:29|         440.25|
|14:00-14:29|          418.3|
|03:30-03:59|          411.0|
|00:30-00:59|          400.0|
|10:30-10:59|          370.0|
|15:00-15:29|          352.0|
|22:30-22:59|          350.0|
|09:30-09:59|         346.76|
|12:30-12:59|          334.0|
|11:00-11:29|          333.0|
|23:00-23:29|         326.44|
|15:30-15:59|          300.0|
|19:30-19:59|          300.0|
|19:00-19:29|         289.86|
|18:00-18:29|          279.7|
|17:30-17:59|         272.77|
|14:30-14:59|          266.5|
|08:30-08:59|         266.01|
|13:30-13:59|          255.0|
|04:30-04:59|         253.73|
|01:00-01:

<p><b> Finding top ten trips with the best tip	per distance travelled:</b></p>

In [28]:
val bestTipPerDistanceDf= clensedTaxiDf.orderBy(col("tip_per_distance").desc).limit(10)

bestTipPerDistanceDf.select("tpep_pickup_datetime","tpep_dropoff_datetime","trip_distance",
                            "pickup_longitude","pickup_latitude").show()

bestTipPerDistanceDf.select("dropoff_longitude","dropoff_latitude","fare_amount","tip_amount","time_period","speed_mph",
                            "tip_per_distance").show()

+--------------------+---------------------+-------------+------------------+------------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|  pickup_longitude|   pickup_latitude|
+--------------------+---------------------+-------------+------------------+------------------+
| 2016-04-15 16:32:14|  2016-04-15 16:42:27|          1.3|-73.95446014404297|  40.7657470703125|
| 2016-03-05 18:46:22|  2016-03-05 18:52:18|         0.17|-73.98796844482422| 40.74385070800781|
| 2016-04-03 14:27:10|  2016-04-03 14:40:39|          0.3|-74.00096893310547| 40.75750732421875|
| 2016-06-02 04:54:19|  2016-06-02 05:00:31|          0.3|-73.90486145019531|40.741363525390625|
| 2016-03-07 00:35:13|  2016-03-07 00:43:36|          0.1| -73.9891586303711| 40.74799728393555|
| 2016-03-18 14:20:59|  2016-03-18 14:26:02|          0.6|-74.01905822753906| 40.63248825073242|
| 2016-03-30 16:46:34|  2016-03-30 17:03:28|          0.3|-73.95664978027344| 40.65058898925781|
| 2016-05-15 02:41:45|  2016-0

<p><b> Identifying top 5 locations for a driver to start a trip on a Saturday night at 10-10:30 pm:</b></p>


In [30]:
// adding columns to represent day of the week, hour and minute of the pickup time 
val timePartsDf = clensedTaxiDf.select("tpep_pickup_datetime", "pickup_longitude", "pickup_latitude").
    withColumn("day_of_week", date_format(col("tpep_pickup_datetime"), "u").cast("int")).
    withColumn("hour", hour(col("tpep_pickup_datetime"))).
    withColumn("minutes", minute(col("tpep_pickup_datetime"))).drop("tpep_pickup_datetime")

In [32]:
val weekday17_19Df = timePartsDf.filter("day_of_week in (1,2,3,4,5)").
    filter("(hour in (17,18)) or (hour ==19 and minutes==0)").drop("day_of_week", "hour", "minutes")

// restricting data to contain records only for Saturday night 10-10:30 pm 
val saturday22_2230Df = timePartsDf.filter("day_of_week == 6").
    filter("hour ==22 and minutes <= 30").drop("day_of_week", "hour", "minutes")

In [43]:
// adding a features column. Datatype of features column will be Vector. 
val featuresSaturdayDf = new VectorAssembler().setInputCols(Array("pickup_latitude","pickup_longitude")).
    setOutputCol("features").transform(saturday22_2230Df) 

// featue Scaling
val scalar = new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures")
val scalarModelSaturday = scalar.fit(featuresSaturdayDf)
val saturdayDfForKMeans: DataFrame = scalarModelSaturday.transform(featuresSaturdayDf)

saturdayDfForKMeans.select("features","scaledFeatures").show(5,truncate=false)

+---------------------------------------+----------------------------------------+
|features                               |scaledFeatures                          |
+---------------------------------------+----------------------------------------+
|[40.729942321777344,-73.98892211914062]|[1459.5148357604442,-1995.7440437759371]|
|[40.761512756347656,-73.97013854980469]|[1460.6461292266306,-1995.2373841902943]|
|[40.74040985107422,-74.00582885742188] |[1459.8899287125,-1996.20007856139]     |
|[40.722320556640625,-73.98699951171875]|[1459.2417177873363,-1995.6921842245229]|
|[40.733463287353516,-73.987548828125]  |[1459.641005875148,-1995.7070012392128] |
+---------------------------------------+----------------------------------------+
only showing top 5 rows



In [4]:
// creating a org.apache.spark.ml.clustering.KMeans object
val kmeans: KMeans = new KMeans().setFeaturesCol("scaledFeatures").setK(5)

// creating a org.apache.spark.ml.clustering.KMeansModel object
val saturdayModel = kmeans.fit(saturdayDfForKMeans)

In [49]:
// identifying cluster centres
saturdayModel.clusterCenters.foreach(vector =>
    println(s"${BigDecimal(vector(0)/35.83395292411391).setScale(6, BigDecimal.RoundingMode.HALF_DOWN).toFloat}," +
      s"${BigDecimal(vector(1)/26.97355207529975).setScale(6, BigDecimal.RoundingMode.HALF_DOWN).toFloat}")
)

40.78092,-73.96542
40.754185,-73.983925
40.64601,-73.785446
40.76914,-73.87549
40.725956,-73.99468


<p><b>Plotting cluster centres on a map. Map was created using following website:</b>
    
http://www.hamstermap.com/quickmap.php</p><br/>


<img src="yell_Saturday22-2230.png"/>

In [6]:
%%html
<style> 
h1,h2,p,table,h4{font-family: Helmet, Freesans, Helvetica, Arial, sans-serif; }
h2{padding:0}
p,h4{ font-size: 18px;}
tr{font-size: 16px;}
</style> 