In [1]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.rdd import portable_hash
from pyspark.statcounter import StatCounter
from pyspark.sql.types import StringType,IntegerType,FloatType
from pyspark.sql.functions import udf
from pyspark.sql.functions import to_timestamp, current_timestamp, col,expr,unix_timestamp,round,when

import os
import json

from datetime import datetime
#pip install shapely
from shapely.geometry import shape, Point
from matplotlib import pyplot as plt
spark = SparkSession.builder.appName("Taxi")\
        .config("spark.driver.memory", "4g")\
        .config("spark.driver.cores", "4")\
        .getOrCreate()
sc=spark.sparkContext

In [6]:
from  pprint import pprint
def title(s):
    pprint("---- %s -----" %s)    
    
def see(s, v):
    pprint("---- %s -----" %s)
    pprint(v)

# Geospatial and Temporal Data Analysis on New York City Taxi Trip Data


- One statistic that is important to understanding the economics of taxis is utilization: the fraction of time that a cab is on the road and is occupied by one or more passen‐ gers. One factor that impacts utilization is the passenger’s destination: a cab that drops off passengers near Union Square at midday is much more likely to find its next fare in just a minute or two, whereas a cab that drops someone off at 2AM on Staten Island may have to drive all the way back to Manhattan before it finds its next fare.

- <b>Our Goal</b>  : We’d like to quantify these effects and find out the average time it takes for a cab to find its next fare as a function of the borough in which it dropped its passengers off—Manhattan, Brooklyn, Queens, the Bronx, Staten Island, or none of the above (e.g., if it dropped the passenger off somewhere outside of the city, like Newark Inter‐ national Airport).

- To carry out this analysis, we need to deal with two types of data that come up all the time: temporal data, such as dates and times, and geospatial information, like points of longitude and latitude and spatial boundaries.

# Dataset
- you can download the NYC taxi dataset from the link http://www.andresmh.com/nyctaxitrips/ then take a sample from it.
- Or use directly the sampled versiod already made uploaded on canvas
- Each row of the file after the header represents a single taxi ride in CSV format. For each ride, we have some attributes of the cab (a hashed version of the medallion num‐ ber) as well as the driver (a hashed version of the hack license, which is what licenses to drive taxis are called), some temporal information about when the trip started and ended, and the longitude/latitude coordinates for where the passenger(s) were picked up and dropped off.

In [3]:
#Let's Read the sampled dataset
#Sample the Data
df = spark.read.csv("ch08-geospatial/sample.csv", header=True)
df.limit(3).toPandas()

Unnamed: 0,medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
0,89D227B655E5C82AECF13C3F540D4CF4,BA96DE419E711691B9445D6A6307C170,CMT,1,N,2013-01-01 15:11:48,2013-01-01 15:18:10,4,382,1.0,-73.978165,40.757977,-73.989838,40.751171
1,0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-06 00:18:35,2013-01-06 00:22:54,1,259,1.5,-74.006683,40.731781,-73.994499,40.75066
2,0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-05 18:49:41,2013-01-05 18:54:23,1,282,1.1,-74.004707,40.73777,-74.009834,40.726002


# Preparing the New York City Taxi Trip Data
- We are mainly intersted in each Trip's:
    - Some Unique ID for the car (license)
    - Pick-up location
    - Pick-up time
    - Drop-off location
    - Drop-off time

In [4]:
taxi=df.select("hack_license","dropoff_longitude","dropoff_latitude","dropoff_datetime","pickup_longitude","pickup_latitude","pickup_datetime")
taxi.limit(3).toPandas()

Unnamed: 0,hack_license,dropoff_longitude,dropoff_latitude,dropoff_datetime,pickup_longitude,pickup_latitude,pickup_datetime
0,BA96DE419E711691B9445D6A6307C170,-73.989838,40.751171,2013-01-01 15:18:10,-73.978165,40.757977,2013-01-01 15:11:48
1,9FD8F69F0804BDB5549F40E9DA1BE472,-73.994499,40.75066,2013-01-06 00:22:54,-74.006683,40.731781,2013-01-06 00:18:35
2,9FD8F69F0804BDB5549F40E9DA1BE472,-74.009834,40.726002,2013-01-05 18:54:23,-74.004707,40.73777,2013-01-05 18:49:41


# Intro to GeoJSON
The data we’ll use for the boundaries of boroughs in New York City comes written in a format called GeoJSON. The core object in GeoJSON is called a feature, which is made up of a geometry instance and a set of key-value pairs called properties. A geometry is a shape like a point, line, or polygon. A set of features is called a FeatureCollection. Let’s pull down the GeoJSON data for the NYC borough maps and take a look at its structure.

In [8]:
#Get a list of NY Zones
with open('ch08-geospatial/nyc-boroughs.geojson', 'r') as f:
        geo = json.load(f)
features = geo['features']


see("features", features[0])

'---- features -----'
{'geometry': {'coordinates': [[[-74.05050806403247, 40.566422034160816],
                               [-74.04998352562575, 40.56639592492827],
                               [-74.04931640362088, 40.56588774778044],
                               [-74.04923629842045, 40.5653627363681],
                               [-74.05002620158643, 40.565318180621134],
                               [-74.05090601705089, 40.5660943421306],
                               [-74.05067916748614, 40.5663108457364],
                               [-74.05107159803778, 40.5667224933978],
                               [-74.05050806403247, 40.566422034160816]]],
              'type': 'Polygon'},
 'id': 0,
 'properties': {'@id': 'http://nyc.pediacities.com/Resource/Borough/Staten_Island',
                'borough': 'Staten Island',
                'boroughCode': 5},
 'type': 'Feature'}


# Adding a shape feature

In [9]:
for f in features:
    f["shape"] = shape(f['geometry'])

In [10]:
features[0]

{'type': 'Feature',
 'id': 0,
 'properties': {'boroughCode': 5,
  'borough': 'Staten Island',
  '@id': 'http://nyc.pediacities.com/Resource/Borough/Staten_Island'},
 'geometry': {'type': 'Polygon',
  'coordinates': [[[-74.05050806403247, 40.566422034160816],
    [-74.04998352562575, 40.56639592492827],
    [-74.04931640362088, 40.56588774778044],
    [-74.04923629842045, 40.5653627363681],
    [-74.05002620158643, 40.565318180621134],
    [-74.05090601705089, 40.5660943421306],
    [-74.05067916748614, 40.5663108457364],
    [-74.05107159803778, 40.5667224933978],
    [-74.05050806403247, 40.566422034160816]]]},
 'shape': <shapely.geometry.polygon.Polygon at 0x7faea5e960f0>}

# Sort zones by area and broadcast to executors

In [13]:
areaSortedFeatures = sorted(features, key=lambda f: (int(f['properties']["boroughCode"]), f["shape"].area), reverse=True)
see("areaSortedFeatures", areaSortedFeatures[0:3])

'---- areaSortedFeatures -----'
[{'geometry': {'coordinates': [[[-74.08221272914938, 40.64828016229008],
                                [-74.08142228203805, 40.64850472594939],
                                [-74.08072838762374, 40.64827487384626],
                                [-74.07980996428705, 40.648383312987924],
                                [-74.07899546333259, 40.648142554422414],
                                [-74.0765065715286, 40.646968818183346],
                                [-74.074452825637, 40.645067488723235],
                                [-74.07395839976468, 40.645193205445516],
                                [-74.07359107919278, 40.64499892804299],
                                [-74.07349851367621, 40.6450833734751],
                                [-74.07385653684726, 40.645424816099606],
                                [-74.07333813856985, 40.64578311616224],
                                [-74.07324539464854, 40.64570492526951],
                 

In [15]:
bFeatures = sc.broadcast(areaSortedFeatures)
# Access the broadcast variable value through value.
bFeaturesValue = bFeatures.value

In [16]:
print(type(areaSortedFeatures)) 
print(type(bFeatures))
print(type(bFeatures.value))

<class 'list'>
<class 'pyspark.broadcast.Broadcast'>
<class 'list'>


# Working with Third-Party Libraries in Spark
- Convert Longitude and latitude to a zone depending on UDF
- We are going to use shapely library written in python
- For better performance you should use a java alternative (Read Advanced anlytics with spark book starting page 172) <b> and this is a bonus assignment: convert the following UDF to the java version </b>

One strategy that experienced data scientists deploy when working with a new data set is to add a try-catch block to their parsing code so that any invalid records can be written out to the logs without causing the entire job to fail. If there are only a handful of invalid records in the entire data set, we might be okay with ignoring them and continuing with our analysis. With Spark, we can do even better: we can adapt our parsing code so that we can interactively analyze the invalid records in our data just as easily as we would perform any other kind of analysis.

In [17]:
#Convert longitude and latitude to point(shapely point)
def locArea(long,latit):
    try:
        pointLocation = Point(float(long), float(latit))
    except ValueError:
        pointLocation = Point(0.0, 0.0)
    return pointLocation

#check and convert point to borough
def borough(long,latit,areaSortedFeatures):
    for f in areaSortedFeatures:
        #contains checks if the 
        if f['shape'].contains(locArea(long,latit)):
            return str(f['properties']["borough"])
    return None

def toPoint(x,y):
    return Point(float(x), float(y))

In [19]:
pickup = udf(lambda long,latit :  borough(long,latit,bFeaturesValue) , StringType())
taxiParsed=taxi.withColumn("pickupArea" ,pickup("pickup_longitude","pickup_latitude")).withColumn("dropoffArea" ,pickup("dropoff_longitude","dropoff_latitude"))
taxiParsed.limit(3).toPandas()

Unnamed: 0,hack_license,dropoff_longitude,dropoff_latitude,dropoff_datetime,pickup_longitude,pickup_latitude,pickup_datetime,pickupArea,dropoffArea
0,BA96DE419E711691B9445D6A6307C170,-73.989838,40.751171,2013-01-01 15:18:10,-73.978165,40.757977,2013-01-01 15:11:48,Manhattan,Manhattan
1,9FD8F69F0804BDB5549F40E9DA1BE472,-73.994499,40.75066,2013-01-06 00:22:54,-74.006683,40.731781,2013-01-06 00:18:35,Manhattan,Manhattan
2,9FD8F69F0804BDB5549F40E9DA1BE472,-74.009834,40.726002,2013-01-05 18:54:23,-74.004707,40.73777,2013-01-05 18:49:41,Manhattan,Manhattan


#   Explore the distribution of trip duration (hours)
Given the temporal nature of our trip data, one reasonable invariant that we can expect is that the dropoff time for any trip will be sometime after the pickup time. We might also expect that trips will not take more than a few hours to complete, although it’s certainly possible that long trips, trips that take place during rush hour, or trips that are delayed by accidents could go on for several hours. We’re not exactly sure what the cutoff should be for a trip that takes a “reasonable” amount of time.let's explore

In [20]:
#unix_timeStamp: Convert time string with given pattern (‘yyyy-MM-dd HH:mm:ss’, by default) 
#to Unix time stamp (in seconds),
#using the default timezone and the default locale, return null if fail.
taxiWithTS = taxiParsed.withColumn('pickupTime',unix_timestamp(to_timestamp(col('pickup_datetime'))))
taxiWithTS = taxiWithTS.withColumn('dropoffTime',unix_timestamp(to_timestamp(col('dropoff_datetime'))))

In [21]:
taxiParsedSelected=taxiWithTS.select("hack_license","pickupArea","pickupTime","dropoffArea","dropoffTime")

In [22]:
taxiParsedSelected.printSchema()

root
 |-- hack_license: string (nullable = true)
 |-- pickupArea: string (nullable = true)
 |-- pickupTime: long (nullable = true)
 |-- dropoffArea: string (nullable = true)
 |-- dropoffTime: long (nullable = true)



In [23]:
taxiParsedSelected.show(3)

+--------------------+----------+----------+-----------+-----------+
|        hack_license|pickupArea|pickupTime|dropoffArea|dropoffTime|
+--------------------+----------+----------+-----------+-----------+
|BA96DE419E711691B...| Manhattan|1357053108|  Manhattan| 1357053490|
|9FD8F69F0804BDB55...| Manhattan|1357431515|  Manhattan| 1357431774|
|9FD8F69F0804BDB55...| Manhattan|1357411781|  Manhattan| 1357412063|
+--------------------+----------+----------+-----------+-----------+
only showing top 3 rows



In [24]:
#diff = unix_timestamp(to_timestamp(col('dropoff_datetime')))-unix_timestamp(to_timestamp(col('pickup_datetime')))
diff = col('dropoffTime')-col('pickupTime')
taxiWithHour = taxiParsedSelected.withColumn("hour",(round(diff/3600)))
taxiWithHour.show(3)

+--------------------+----------+----------+-----------+-----------+----+
|        hack_license|pickupArea|pickupTime|dropoffArea|dropoffTime|hour|
+--------------------+----------+----------+-----------+-----------+----+
|BA96DE419E711691B...| Manhattan|1357053108|  Manhattan| 1357053490| 0.0|
|9FD8F69F0804BDB55...| Manhattan|1357431515|  Manhattan| 1357431774| 0.0|
|9FD8F69F0804BDB55...| Manhattan|1357411781|  Manhattan| 1357412063| 0.0|
+--------------------+----------+----------+-----------+-----------+----+
only showing top 3 rows



In [25]:
taxiWithHour.groupBy("hour").count().show()

+----+-----+
|hour|count|
+----+-----+
| 0.0|96615|
| 1.0| 3378|
| 3.0|    1|
| 2.0|    5|
+----+-----+



#  Remove trips with -ve durations and longer than 3 hours

In [26]:
#Cache as we are going to use it many times
taxiClean=taxiWithHour.filter("hour >= 0 and hour <3").cache()
taxiClean.limit(3).toPandas()

Unnamed: 0,hack_license,pickupArea,pickupTime,dropoffArea,dropoffTime,hour
0,BA96DE419E711691B9445D6A6307C170,Manhattan,1357053108,Manhattan,1357053490,0.0
1,9FD8F69F0804BDB5549F40E9DA1BE472,Manhattan,1357431515,Manhattan,1357431774,0.0
2,9FD8F69F0804BDB5549F40E9DA1BE472,Manhattan,1357411781,Manhattan,1357412063,0.0


In [27]:
boroughCount=taxiClean.groupBy('dropoffArea').count().orderBy('count' , ascending=False)
boroughCount.show()

+-------------+-----+
|  dropoffArea|count|
+-------------+-----+
|    Manhattan|88163|
|       Queens| 5475|
|     Brooklyn| 3595|
|         null| 2357|
|        Bronx|  395|
|Staten Island|   13|
+-------------+-----+



# Sessionize trips

- Look at sorted list of trips for each driver
- Once the difference between any 2 consecurtive trips is longer than 4 hours, we consider that a new session has started
- Our goal, from many pages ago, was to investigate the relationship between the bor‐ ough in which a driver drops his passenger off and the amount of time it takes to acquire another fare. At this point, the taxiDone data set contains all of the individual trips for each taxi driver in individual records distributed across different partitions of the data. To compute the length of time between the end of one ride and the start of the next one, we need to aggregate all of the trips from a shift by a single driver into a single record, and then sort the trips within that shift by time. The sort step allows us to compare the dropoff time of one trip to the pickup time of the next trip. This kind of analysis, in which we want to analyze a single entity as it executes a series of events over time, is called sessionization, and is commonly performed over web logs to analyze the behavior of the users of a website.


In [28]:
#calculating the difference between the pickup time and the previous dropoff loc and put it in another column
from pyspark.sql.functions import col, lag
from pyspark.sql.window import Window

#partition by driver to make sure to order the trips done by the same driver
w = Window.partitionBy("hack_license").orderBy("pickupTime")

diff = col("pickupTime") - lag("dropoffTime", 1).over(w)

taxiCleanwithdifferenceTime=taxiClean.withColumn("diff", diff)
taxiCleanwithdifferenceTime.limit(20).toPandas()

Unnamed: 0,hack_license,pickupArea,pickupTime,dropoffArea,dropoffTime,hour,diff
0,02856AFC22881ABCADDD5284BADDEB8D,Manhattan,1358044920,Manhattan,1358045460,0.0,
1,02856AFC22881ABCADDD5284BADDEB8D,Manhattan,1358047920,Manhattan,1358048460,0.0,2460.0
2,02856AFC22881ABCADDD5284BADDEB8D,Manhattan,1358048640,Manhattan,1358049420,0.0,180.0
3,03A2D28F831C5C3E590F9E4A511BD3B1,Manhattan,1358063700,Manhattan,1358064000,0.0,
4,03A2D28F831C5C3E590F9E4A511BD3B1,Manhattan,1358069580,Manhattan,1358069760,0.0,5580.0
5,03A2D28F831C5C3E590F9E4A511BD3B1,Manhattan,1358070420,Manhattan,1358071320,0.0,660.0
6,03A2D28F831C5C3E590F9E4A511BD3B1,Manhattan,1358075340,Manhattan,1358076000,0.0,4020.0
7,03A2D28F831C5C3E590F9E4A511BD3B1,Manhattan,1358076120,Manhattan,1358076300,0.0,120.0
8,03A2D28F831C5C3E590F9E4A511BD3B1,Manhattan,1358076900,Queens,1358079120,1.0,600.0
9,03A2D28F831C5C3E590F9E4A511BD3B1,Queens,1358085060,Queens,1358085840,0.0,5940.0


In [29]:
taxiCleanwithdifferenceTimewithouNulls=taxiCleanwithdifferenceTime.na.fill({"diff" : 0.0})
taxiCleanwithdifferenceTimewithouNulls.show(10)

+--------------------+----------+----------+-----------+-----------+----+----+
|        hack_license|pickupArea|pickupTime|dropoffArea|dropoffTime|hour|diff|
+--------------------+----------+----------+-----------+-----------+----+----+
|02856AFC22881ABCA...| Manhattan|1358044920|  Manhattan| 1358045460| 0.0|   0|
|02856AFC22881ABCA...| Manhattan|1358047920|  Manhattan| 1358048460| 0.0|2460|
|02856AFC22881ABCA...| Manhattan|1358048640|  Manhattan| 1358049420| 0.0| 180|
|03A2D28F831C5C3E5...| Manhattan|1358063700|  Manhattan| 1358064000| 0.0|   0|
|03A2D28F831C5C3E5...| Manhattan|1358069580|  Manhattan| 1358069760| 0.0|5580|
|03A2D28F831C5C3E5...| Manhattan|1358070420|  Manhattan| 1358071320| 0.0| 660|
|03A2D28F831C5C3E5...| Manhattan|1358075340|  Manhattan| 1358076000| 0.0|4020|
|03A2D28F831C5C3E5...| Manhattan|1358076120|  Manhattan| 1358076300| 0.0| 120|
|03A2D28F831C5C3E5...| Manhattan|1358076900|     Queens| 1358079120| 1.0| 600|
|03A2D28F831C5C3E5...|    Queens|1358085060|     Que

In [30]:
#4 hrs = 14400.0 sec 
TaxiIdleTime = taxiCleanwithdifferenceTimewithouNulls.withColumn("idleTime",when(col("diff")>=14400,0).otherwise(col("diff")))
TaxiIdleTime.show(5)

+--------------------+----------+----------+-----------+-----------+----+----+--------+
|        hack_license|pickupArea|pickupTime|dropoffArea|dropoffTime|hour|diff|idleTime|
+--------------------+----------+----------+-----------+-----------+----+----+--------+
|02856AFC22881ABCA...| Manhattan|1358044920|  Manhattan| 1358045460| 0.0|   0|       0|
|02856AFC22881ABCA...| Manhattan|1358047920|  Manhattan| 1358048460| 0.0|2460|    2460|
|02856AFC22881ABCA...| Manhattan|1358048640|  Manhattan| 1358049420| 0.0| 180|     180|
|03A2D28F831C5C3E5...| Manhattan|1358063700|  Manhattan| 1358064000| 0.0|   0|       0|
|03A2D28F831C5C3E5...| Manhattan|1358069580|  Manhattan| 1358069760| 0.0|5580|    5580|
+--------------------+----------+----------+-----------+-----------+----+----+--------+
only showing top 5 rows



In [31]:
#it is clear that staten island and manhattan have least idle time
TaxiIdleTime.groupBy("dropoffArea").avg("idleTime").orderBy("avg(idleTime)").show()

+-------------+------------------+
|  dropoffArea|     avg(idleTime)|
+-------------+------------------+
|Staten Island|             240.0|
|    Manhattan| 869.9117997345826|
|     Brooklyn|1118.8603616133519|
|        Bronx|1148.3544303797469|
|         null|1248.9681798896902|
|       Queens|1351.7227397260274|
+-------------+------------------+

