## Analyzing NewYork City Taxi Rides
### Author: Zynab Smaan.

In [1]:
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from shapely.geometry import Point, Polygon
from pyspark.context import SparkContext

In [2]:
spark = SparkSession \
    .builder \
    .appName("Analyzing New york city trips") \
    .getOrCreate()

In [3]:
trips = spark.read\
               .format("csv")\
               .option("header", "true")\
               .load("sample.csv")

In [4]:
trips.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_time_in_secs: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)



## Cleaning the data 

#### 1- Drop columns we don't need.

In [5]:
# Let's drop these columns
trips = trips.drop('medallion', 'vendor_id', 'rate_code', 'store_and_fwd_flag', 
                   'passenger_count', 'trip_distance', 'trip_time_in_secs')
trips.columns

['hack_license',
 'pickup_datetime',
 'dropoff_datetime',
 'pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude']

In [6]:
trips.show(1)

+--------------------+-------------------+-------------------+----------------+---------------+-----------------+----------------+
|        hack_license|    pickup_datetime|   dropoff_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|
+--------------------+-------------------+-------------------+----------------+---------------+-----------------+----------------+
|BA96DE419E711691B...|2013-01-01 15:11:48|2013-01-01 15:18:10|      -73.978165|      40.757977|       -73.989838|       40.751171|
+--------------------+-------------------+-------------------+----------------+---------------+-----------------+----------------+
only showing top 1 row



##### The size of the data before filteration.

In [7]:
trips.count()

99999

#### 2- Dropping nan values.

In [8]:
trips = trips.dropna()


In [9]:
trips.count()

99999

#### 3- Creating duration (ms) column for each trip.

In [10]:
# Create column duration 
trips = trips.withColumn("duration", unix_timestamp("dropoff_datetime", "yyyy-MM-dd HH:mm:ss")
                         .cast("double") - unix_timestamp("pickup_datetime", "yyyy-MM-dd HH:mm:ss")
                         .cast("double"))

In [11]:
trips.show(5)

+--------------------+-------------------+-------------------+----------------+---------------+-----------------+----------------+--------+
|        hack_license|    pickup_datetime|   dropoff_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|duration|
+--------------------+-------------------+-------------------+----------------+---------------+-----------------+----------------+--------+
|BA96DE419E711691B...|2013-01-01 15:11:48|2013-01-01 15:18:10|      -73.978165|      40.757977|       -73.989838|       40.751171|   382.0|
|9FD8F69F0804BDB55...|2013-01-06 00:18:35|2013-01-06 00:22:54|      -74.006683|      40.731781|       -73.994499|        40.75066|   259.0|
|9FD8F69F0804BDB55...|2013-01-05 18:49:41|2013-01-05 18:54:23|      -74.004707|       40.73777|       -74.009834|       40.726002|   282.0|
|51EE87E3205C985EF...|2013-01-07 23:54:15|2013-01-07 23:58:20|      -73.974602|      40.759945|       -73.984734|       40.759388|   245.0|
|51EE87E3205C985EF..

#### 4- filter data if the duration is less than 0 or bigger than (4 hours = 4*60*60 = 14400).

In [12]:
trips = trips.filter((trips['duration'] <= 14400.0) & (trips['duration'] > 0.0))

##### The size of the data after filteration.

In [13]:
trips.count()

99550

**NOTES**
 - The data is cleaned, and we have columns we need.
 - The size of the data is reduced w.r.t our cleaning.

## Load GEOJSON data

In [14]:
geojson_data = spark.read.json("nyc-boroughs.geojson")

In [15]:
geojson_data.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- geometry: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: array (containsNull = true)
 |    |    |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- id: long (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- @id: string (nullable = true)
 |    |-- borough: string (nullable = true)
 |    |-- boroughCode: long (nullable = true)
 |-- type: string (nullable = true)



### Getting columns we need 

In [18]:
cleaned_geojson_data = geojson_data.select(col("properties.*"), col("geometry.*"))\
.select("borough","coordinates").dropna()

In [19]:
cleaned_geojson_data.count()

104

In [20]:
cleaned_geojson_data.show(1, False)

+-------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|borough      |coordinates                                                                                                                                                                                                                                                                                                                                                                       |
+-------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### Broadcast the variables 

In [34]:
b_borough_list = [x["borough"] for x in cleaned_geojson_data.rdd.collect()]
b_coordinates_list = [x["coordinates"] for x in cleaned_geojson_data.rdd.collect()]

In [35]:
def get_borough(lang, lat):
    p1 = Point(float(lang), float(lat))
    for b,c in zip(b_borough_list, b_coordinates_list):
        poly = Polygon(c[0])
        if p1.within(poly):
            return b

In [36]:
get_borough_udf = udf(get_borough, StringType())

In [39]:
trips = trips.withColumn("pickup_destination", get_borough_udf("pickup_longitude", "pickup_latitude"))\
.withColumn("dropoff_destination",\
            get_borough_udf("dropoff_longitude", "dropoff_latitude")).show(10)

+--------------------+-------------------+-------------------+----------------+---------------+-----------------+----------------+--------+------------------+-------------------+
|        hack_license|    pickup_datetime|   dropoff_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|duration|pickup_destination|dropoff_destination|
+--------------------+-------------------+-------------------+----------------+---------------+-----------------+----------------+--------+------------------+-------------------+
|BA96DE419E711691B...|2013-01-01 15:11:48|2013-01-01 15:18:10|      -73.978165|      40.757977|       -73.989838|       40.751171|   382.0|         Manhattan|          Manhattan|
|9FD8F69F0804BDB55...|2013-01-06 00:18:35|2013-01-06 00:22:54|      -74.006683|      40.731781|       -73.994499|        40.75066|   259.0|         Manhattan|          Manhattan|
|9FD8F69F0804BDB55...|2013-01-05 18:49:41|2013-01-05 18:54:23|      -74.004707|       40.73777|       -74