In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# App Initialization

## Install the needed packages

In [2]:
pip install pyspark



In [3]:
pip install shapely



## Import the needed packages

In [4]:
from shapely.geometry import Point, Polygon
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, lead, broadcast, from_utc_timestamp
from pyspark.sql.window import Window
#https://shapely.readthedocs.io/en/latest/manual.html

****

## Build a Spark application

In [5]:
spark = SparkSession \
    .builder \
    .appName("NYC Taxi Rides Utilization") \
    .getOrCreate()

In [6]:
spark

# Prepare the boroughs data

## Read the data

In [7]:
boroughs = spark.read.json("/content/drive/MyDrive/CIT-652 - Advanced Big Data/Project 01 -  NYC Taxi Rides Utilization/nyc-boroughs.geojson")
#boroughs = spark.read.json("/FileStore/tables/nyc_boroughs.geojson")
boroughs.limit(5).show()

+--------------------+--------------------+----+--------------------+-------+
|     _corrupt_record|            geometry|  id|          properties|   type|
+--------------------+--------------------+----+--------------------+-------+
|                   {|                null|null|                null|   null|
|"type": "FeatureC...|                null|null|                null|   null|
|       "features": [|                null|null|                null|   null|
|                null|{[[[-74.050508064...|   0|{http://nyc.pedia...|Feature|
|                   ,|                null|null|                null|   null|
+--------------------+--------------------+----+--------------------+-------+



In [8]:
boroughs = boroughs.drop('_corrupt_record','type').dropna()
boroughs.limit(5).show(truncate=True)
boroughs.printSchema

+--------------------+---+--------------------+
|            geometry| id|          properties|
+--------------------+---+--------------------+
|{[[[-74.050508064...|  0|{http://nyc.pedia...|
|{[[[-74.053140368...|  1|{http://nyc.pedia...|
|{[[[-74.159456024...|  2|{http://nyc.pedia...|
|{[[[-74.082212729...|  3|{http://nyc.pedia...|
|{[[[-73.836682741...|  4|{http://nyc.pedia...|
+--------------------+---+--------------------+



<bound method DataFrame.printSchema of DataFrame[geometry: struct<coordinates:array<array<array<double>>>,type:string>, id: bigint, properties: struct<@id:string,borough:string,boroughCode:bigint>]>

## Extract boroughs names and codes

In [9]:
get_borough_name = udf(lambda properties: properties[1])
get_borough_code = udf(lambda properties: properties[2])

boroughs = boroughs\
    .withColumn(
        'borough_name',
        get_borough_name(boroughs.properties))\
    .withColumn(
        'borough_code',
        get_borough_code(boroughs.properties))
    #.drop('properties')

boroughs.limit(5).show(truncate=True)

boroughs.count()

+--------------------+---+--------------------+-------------+------------+
|            geometry| id|          properties| borough_name|borough_code|
+--------------------+---+--------------------+-------------+------------+
|{[[[-74.050508064...|  0|{http://nyc.pedia...|Staten Island|           5|
|{[[[-74.053140368...|  1|{http://nyc.pedia...|Staten Island|           5|
|{[[[-74.159456024...|  2|{http://nyc.pedia...|Staten Island|           5|
|{[[[-74.082212729...|  3|{http://nyc.pedia...|Staten Island|           5|
|{[[[-73.836682741...|  4|{http://nyc.pedia...|       Queens|           4|
+--------------------+---+--------------------+-------------+------------+



104

## Sort boroughs by area

In [10]:
boroughs.select("borough_name", "borough_code").distinct().show()

+-------------+------------+
| borough_name|borough_code|
+-------------+------------+
|Staten Island|           5|
|        Bronx|           2|
|       Queens|           4|
|    Manhattan|           1|
|     Brooklyn|           3|
+-------------+------------+



In [11]:
# https://en.wikipedia.org/wiki/Boroughs_of_New_York_City

def area_sort(borough_name):
  if borough_name == "Queens":
    return 1
  if borough_name == "Brooklyn":
    return 2
  if borough_name == "Staten Island":
    return 3
  if borough_name == "Bronx":
    return 4
  if borough_name == "Manhattan":
    return 5

In [12]:
area_sort_UDF = udf(lambda borough_name: area_sort(borough_name))

In [13]:
boroughs = boroughs.withColumn("area_sort", area_sort_UDF(boroughs.borough_name))
boroughs.limit(5).show()

+--------------------+---+--------------------+-------------+------------+---------+
|            geometry| id|          properties| borough_name|borough_code|area_sort|
+--------------------+---+--------------------+-------------+------------+---------+
|{[[[-74.050508064...|  0|{http://nyc.pedia...|Staten Island|           5|        3|
|{[[[-74.053140368...|  1|{http://nyc.pedia...|Staten Island|           5|        3|
|{[[[-74.159456024...|  2|{http://nyc.pedia...|Staten Island|           5|        3|
|{[[[-74.082212729...|  3|{http://nyc.pedia...|Staten Island|           5|        3|
|{[[[-73.836682741...|  4|{http://nyc.pedia...|       Queens|           4|        1|
+--------------------+---+--------------------+-------------+------------+---------+



In [14]:
boroughs = boroughs.sort(boroughs.area_sort.desc())
boroughs.limit(5).show()

+--------------------+---+--------------------+------------+------------+---------+
|            geometry| id|          properties|borough_name|borough_code|area_sort|
+--------------------+---+--------------------+------------+------------+---------+
|{[[[-74.010928412...| 45|{http://nyc.pedia...|   Manhattan|           1|        5|
|{[[[-74.043877615...| 49|{http://nyc.pedia...|   Manhattan|           1|        5|
|{[[[-74.005003733...| 46|{http://nyc.pedia...|   Manhattan|           1|        5|
|{[[[-74.003820389...| 47|{http://nyc.pedia...|   Manhattan|           1|        5|
|{[[[-74.002975654...| 48|{http://nyc.pedia...|   Manhattan|           1|        5|
+--------------------+---+--------------------+------------+------------+---------+



## Create a tuple with boroughs and its polygons to be broadcasted

In [15]:
import numpy as np
boroughs_np_array =  np.array(boroughs.select("geometry", "borough_name").collect())

  


In [16]:
boroughs_tuple = ()

for i in range(0, len(boroughs_np_array)):
    coordinates_list = []
    for j in range(0, len(boroughs_np_array[i,0][0][0])):
      coordinates_list.append((float(boroughs_np_array[i,0][0][0][j][0]), float(boroughs_np_array[i,0][0][0][j][1])))
    
    # (borough_name 1, borough_polygon 1, borough_name 2, borough_polygon 2,...)
    boroughs_tuple = boroughs_tuple + (boroughs_np_array[i,1], Polygon(coordinates_list))

broadcastStates = spark.sparkContext.broadcast(boroughs_tuple)

# Prepare the rides data

## Read the data

In [17]:
rides = spark.read\
               .format("csv")\
               .option("header", "true")\
               .load("/content/drive/MyDrive/CIT-652 - Advanced Big Data/Project 01 -  NYC Taxi Rides Utilization/sample.csv")
                #.load("/FileStore/tables/sample.csv")\

rides.limit(5).show(truncate=False)

rides.count()

+--------------------------------+--------------------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+
|medallion                       |hack_license                    |vendor_id|rate_code|store_and_fwd_flag|pickup_datetime    |dropoff_datetime   |passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|
+--------------------------------+--------------------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+
|89D227B655E5C82AECF13C3F540D4CF4|BA96DE419E711691B9445D6A6307C170|CMT      |1        |N                 |2013-01-01 15:11:48|2013-01-01 15:18:10|4              |382              |1.00         |-73.978165      |

99999

## Remove Trips with negative time or more then 4 hours

In [18]:
rides = rides.where("trip_time_in_secs BETWEEN 1 AND 14400")
rides.count()

99550

## Keep the needed columns only

In [19]:
rides = rides.drop("hack_license", "vendor_id", "rate_code", "store_and_fwd_flag", "passenger_count", "trip_distance", "dropoff_datetime")
rides.limit(5).show(truncate=False)

+--------------------------------+-------------------+-----------------+----------------+---------------+-----------------+----------------+
|medallion                       |pickup_datetime    |trip_time_in_secs|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|
+--------------------------------+-------------------+-----------------+----------------+---------------+-----------------+----------------+
|89D227B655E5C82AECF13C3F540D4CF4|2013-01-01 15:11:48|382              |-73.978165      |40.757977      |-73.989838       |40.751171       |
|0BD7C8F5BA12B88E0B67BED28BEA73D8|2013-01-06 00:18:35|259              |-74.006683      |40.731781      |-73.994499       |40.75066        |
|0BD7C8F5BA12B88E0B67BED28BEA73D8|2013-01-05 18:49:41|282              |-74.004707      |40.73777       |-74.009834       |40.726002       |
|DFD2202EE08F7A8DC9A57B02ACB81FE2|2013-01-07 23:54:15|244              |-73.974602      |40.759945      |-73.984734       |40.759388       |
|DFD2202EE08F

## Get pickup time in UNIX timestamp

In [20]:
from pyspark.sql.functions import unix_timestamp

rides = rides\
    .withColumn(
        'pickup_sec',
        unix_timestamp(rides.pickup_datetime))
    #.drop("pickup_datetime")

rides.limit(5).show(truncate=True)

+--------------------+-------------------+-----------------+----------------+---------------+-----------------+----------------+----------+
|           medallion|    pickup_datetime|trip_time_in_secs|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|pickup_sec|
+--------------------+-------------------+-----------------+----------------+---------------+-----------------+----------------+----------+
|89D227B655E5C82AE...|2013-01-01 15:11:48|              382|      -73.978165|      40.757977|       -73.989838|       40.751171|1357053108|
|0BD7C8F5BA12B88E0...|2013-01-06 00:18:35|              259|      -74.006683|      40.731781|       -73.994499|        40.75066|1357431515|
|0BD7C8F5BA12B88E0...|2013-01-05 18:49:41|              282|      -74.004707|       40.73777|       -74.009834|       40.726002|1357411781|
|DFD2202EE08F7A8DC...|2013-01-07 23:54:15|              244|      -73.974602|      40.759945|       -73.984734|       40.759388|1357602855|
|DFD2202EE08F7A8DC..

## Get next trip time

In [21]:
windowSpec  = Window.partitionBy("medallion").orderBy("pickup_sec")

rides = rides.withColumn(
    'next_trip_pickup_sec',
    lead("pickup_sec", 1).over(windowSpec)
    )

rides = rides.drop("pickup_datetime")

rides.limit(5).show(truncate=False)

+--------------------------------+-----------------+----------------+---------------+-----------------+----------------+----------+--------------------+
|medallion                       |trip_time_in_secs|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|pickup_sec|next_trip_pickup_sec|
+--------------------------------+-----------------+----------------+---------------+-----------------+----------------+----------+--------------------+
|0F621E366CFE63044BFED29EA126CDB9|960              |-73.932121      |40.857803      |-73.940109       |40.815514       |1358051460|1358060220          |
|0F621E366CFE63044BFED29EA126CDB9|720              |-73.951736      |40.78183       |-74.000549       |40.755203       |1358060220|1358061060          |
|0F621E366CFE63044BFED29EA126CDB9|420              |-74.004158      |40.75296       |-73.975716       |40.796326       |1358061060|1358063160          |
|0F621E366CFE63044BFED29EA126CDB9|960              |-73.988029      |40.754784    

## Romeve idle time with more that 4 hours

In [22]:
rides = rides.withColumn(
                "idle_time_sec",
                rides.next_trip_pickup_sec - rides.pickup_sec - rides.trip_time_in_secs
              )

rides = rides.withColumn(
                "idle_time_mins",
                rides.idle_time_sec/60
              )

rides = rides.where("idle_time_sec BETWEEN 0 AND 14400")

rides = rides.drop("pickup_sec", "next_trip_pickup_sec", "trip_time_in_secs")

rides.limit(5).show()

rides.count()

+--------------------+----------------+---------------+-----------------+----------------+-------------+--------------+
|           medallion|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|idle_time_sec|idle_time_mins|
+--------------------+----------------+---------------+-----------------+----------------+-------------+--------------+
|0F621E366CFE63044...|      -73.932121|      40.857803|       -73.940109|       40.815514|       7800.0|         130.0|
|0F621E366CFE63044...|      -73.951736|       40.78183|       -74.000549|       40.755203|        120.0|           2.0|
|0F621E366CFE63044...|      -74.004158|       40.75296|       -73.975716|       40.796326|       1680.0|          28.0|
|0F621E366CFE63044...|      -73.988029|      40.754784|        -73.97184|       40.679226|       1200.0|          20.0|
|0F621E366CFE63044...|      -73.978165|      40.745846|       -73.984222|       40.752197|       3780.0|          63.0|
+--------------------+----------------+-

90420

# Prepare the master dataframe (boroughs & rides)

## Create the needed functions

In [23]:
def get_borough(longitude, latitude):
  """
    This function get the borough name of a point with the parameters (longitude, latitude) from the broadcasted polygons tubles
    It returns the name of the name of the borough is available
  """
  i = 0
  while i < len(boroughs_tuple):
    if Point(float(longitude), float(latitude)).within(boroughs_tuple[i+1]):
      return boroughs_tuple[i]
    i = i + 2

In [24]:
get_borough_udf = udf(lambda longitude, latitude: get_borough(longitude, latitude))

## Add origin borough

In [25]:
rides = rides.withColumn("from_borough", get_borough_udf(rides.pickup_longitude, rides.pickup_latitude))

## Add destination borough

In [26]:
rides = rides.withColumn("to_borough", get_borough_udf(rides.dropoff_longitude, rides.dropoff_latitude))

## Check rides with unknown origin/destination

In [27]:
rides.filter("from_borough is NULL").count()

1459

In [28]:
rides.filter("to_borough is NULL").count()

1775

## Drop the rides with unknown origin/destination

In [29]:
rides = rides.filter("from_borough IS NOT NULL AND to_borough IS NOT NULL")

## Select the needed columns

In [30]:
rides.columns

['medallion',
 'pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude',
 'idle_time_sec',
 'idle_time_mins',
 'from_borough',
 'to_borough']

In [None]:
data = rides.select("idle_time_mins", "from_borough", "to_borough")
data.limit(5).show()
data.count()

# Tasks solutions

## Task 01: The average time it takes for taxi to find it next trip per destination borough

In [None]:
data.groupBy("to_borough").avg("idle_time_mins").show()

## Task 02: The number of trips that started and ended within same borough

In [None]:
data.where("from_borough = to_borough").count()

## Task 03: The number of trips that start in one borough and ended in another one


In [None]:
data.where("from_borough <> to_borough").count()