# Advanced Big Data Analytics: Project 1: Taxi Trip Analysis

### Team Members:
* **Name:** *Salma Khaled Hegazy* - **ID:** *221002185*
* **Name:** *Monier Ashraf Monier* - **ID:** *221002188*
* **Name:** *Tawfik Yasser Tawfik* - **ID:** *221002143*

In [1]:
# Static variables
geoDataPath = "/home/tawfik/SSDData/NU/Research/Courses/ABDA/P1/nyc-boroughs.geojson"
mainDatasetPath = "/home/tawfik/SSDData/NU/Research/Courses/ABDA/P1/sample.csv"

In [2]:
# Importing libs
from pyspark.sql import SparkSession
import json
#!pip install shapely
from shapely.geometry import shape, Point
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, lag
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.window import Window

In [3]:
spark = SparkSession \
    .builder \
    .appName("Project #1") \
    .getOrCreate()

23/03/18 12:32:34 WARN Utils: Your hostname, tawfik resolves to a loopback address: 127.0.1.1; using 192.168.1.3 instead (on interface enp0s31f6)
23/03/18 12:32:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/18 12:32:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Defining the schema for the rides dataset

schema = StructType([
    StructField("medallion", StringType(), True),
    StructField("hack_license", StringType(), True),
    StructField("vendor_id", StringType(), True),
    StructField("rate_code", IntegerType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_time_in_secs", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("pickup_longitude", DoubleType(), True),
    StructField("pickup_latitude", DoubleType(), True),
    StructField("dropoff_longitude", DoubleType(), True),
    StructField("dropoff_latitude", DoubleType(), True)
])

In [5]:
# Reading the rides dataset in `rides` dataframe
rides = spark.read\
               .format("csv")\
               .option("header", "true")\
               .schema(schema)\
               .load(mainDatasetPath)

In [6]:
rides.toPandas().head(5)

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


Unnamed: 0,medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
0,89D227B655E5C82AECF13C3F540D4CF4,BA96DE419E711691B9445D6A6307C170,CMT,1,N,2013-01-01 15:11:48,2013-01-01 15:18:10,4,382,1.0,-73.978165,40.757977,-73.989838,40.751171
1,0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-06 00:18:35,2013-01-06 00:22:54,1,259,1.5,-74.006683,40.731781,-73.994499,40.75066
2,0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-05 18:49:41,2013-01-05 18:54:23,1,282,1.1,-74.004707,40.73777,-74.009834,40.726002
3,DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,2013-01-07 23:54:15,2013-01-07 23:58:20,2,244,0.7,-73.974602,40.759945,-73.984734,40.759388
4,DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,2013-01-07 23:25:03,2013-01-07 23:34:24,1,560,2.1,-73.97625,40.748528,-74.002586,40.747868


In [7]:
# Describing the rides dataset
rides.describe().toPandas()

                                                                                

Unnamed: 0,summary,medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
0,count,99999,99999,99999,99999.0,373,99999.0,99999.0,99999.0,99999.0,99999.0,99999.0,99999.0
1,mean,,,,1.04210042100421,,2.1630016300163,650.639736397364,2.985983259832584,-72.65573395207979,40.04864264028657,-72.55315789414786,39.922430157271634
2,stddev,,,,0.2854981654227779,,1.7398882948433545,469.5655711271287,3.595598165741431,9.78401220373578,5.795547771378339,10.154353047085426,14.806522847291616
3,min,000318C2E3E6381580E5C99910A60668,001C8AAB90AEE49F36FCAA7B4136C81A,CMT,1.0,N,0.0,0.0,0.0,-98.116669,0.0,-95.650002,-3113.7888
4,max,FFFECF75AB6CC4FF9E8A8B633AB81C26,FFFBCEA3D4E21E05902EE67AD556F67C,VTS,5.0,Y,6.0,9180.0,52.88,0.0,646.43829,0.0,652.72314


In [8]:
# Size of the rides dataset = 16.6 MB
# Number of rows = 99999
# Machine cores = 4
# Number of partitions = 4 (rides.rdd.getNumPartitions())

# We will repartition the dataset to 8 partitions

rides = rides.repartition(8)

In [9]:
# Reading the geo dataset in `boroughs` from the json file

with open(geoDataPath, 'r') as f:
    boroughs = json.load(f)

In [19]:
# Reading the first feature in the geo dataset

print(json.dumps(boroughs['features'][0], indent=4))

polygons = []

current_code = 0
current_name = ''
index = -1

# We will iterate over the features in the geo dataset
# For each feature, we will extract the borough code and name
# We will create a new polygon for each borough code, if it doesn't exist, else we will add the polygon to the existing one
# We will add the polygon to the list of polygons for that borough code

for feature in boroughs['features']:
    if current_code == feature['properties']['boroughCode']:
        polygons[index]['polygon'] = polygons[index]['polygon'].union(shape(feature['geometry']))
        continue
    current_code = feature['properties']['boroughCode']
    current_name = feature['properties']['borough']
    polygons.append({'code':current_code,'name':current_name,'polygon':shape(feature['geometry'])})
    index+=1

{
    "type": "Feature",
    "id": 0,
    "properties": {
        "boroughCode": 5,
        "borough": "Staten Island",
        "@id": "http://nyc.pediacities.com/Resource/Borough/Staten_Island"
    },
    "geometry": {
        "type": "Polygon",
        "coordinates": [
            [
                [
                    -74.05050806403247,
                    40.566422034160816
                ],
                [
                    -74.04998352562575,
                    40.56639592492827
                ],
                [
                    -74.04931640362088,
                    40.56588774778044
                ],
                [
                    -74.04923629842045,
                    40.5653627363681
                ],
                [
                    -74.05002620158643,
                    40.565318180621134
                ],
                [
                    -74.05090601705089,
                    40.5660943421306
                ],
                [
      

In [20]:
# Sorting the polygons by the biggest area first

polygons.sort(key=lambda x: x['polygon'].area, reverse=True)

# Broadcasting the union_polygons to all the workers

polygons = spark.sparkContext.broadcast(polygons)

# Defining the function to get the borough name from the coordinates

def get_borough_name(longitude, latitude):
    point = Point(longitude, latitude)
    for borough in polygons.value:
        if borough['polygon'].contains(point):
            return borough['name']
    return 'Unknown'

In [21]:
# Creating a UDF to get the borough name from the coordinates

get_borough_name_udf = udf(get_borough_name, StringType())

### TASK 1

In [22]:
# Adding 2 columns using the UDF:
# pickup_borough: the name of the borough where the pickup point is located
# dropoff_borough: the name of the borough where the dropoff point is located

rides = rides.withColumn('pickup_borough', get_borough_name_udf(rides['pickup_longitude'], rides['pickup_latitude']))

rides = rides.withColumn('dropoff_borough', get_borough_name_udf(rides['dropoff_longitude'], rides['dropoff_latitude']))

In [23]:
# Keeping only rows with ride_duration < 4 hours or 240 minutes or 14400 seconds

rides = rides.filter(rides['trip_time_in_secs'] <= 14400) # 4 hours = 240 minutes = 14400 seconds

In [24]:
# Drop rows with Unknown boroughs in pickup or dropoff boroughs 

rides = rides.filter(rides['pickup_borough'] != 'Unknown')

rides = rides.filter(rides['dropoff_borough'] != 'Unknown')

In [25]:
rides.toPandas().head(1)

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


Unnamed: 0,medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,pickup_borough,dropoff_borough
0,6FC85A1A4C9E9824D0D1A10532166A81,DCEC660967D86477A1AD47AB84A4C1A0,VTS,1,,2013-01-13 06:19:00,2013-01-13 06:46:00,1,1620,20.0,-73.786118,40.641403,-74.023125,40.616669,Queens,Brooklyn


In [26]:
# Calculating the previous dropoff_datetime for each hack_license, 
# and adding it as a new column it will be used difference between the start of the ride and the end of the previous ride
# and finally we will aggergate over the difference to get the average difference

windowBoundary  = Window.partitionBy("hack_license").orderBy("pickup_datetime")

rides = rides.withColumn("dropoff_LAG",lag("dropoff_datetime",1).over(windowBoundary)) 

rides.select('hack_license', 'pickup_datetime', 'pickup_borough', 'dropoff_datetime', 'dropoff_borough', 'dropoff_LAG').limit(10).show()



+--------------------+-------------------+--------------+-------------------+---------------+-------------------+
|        hack_license|    pickup_datetime|pickup_borough|   dropoff_datetime|dropoff_borough|        dropoff_LAG|
+--------------------+-------------------+--------------+-------------------+---------------+-------------------+
|006313464EC98A24B...|2013-01-13 07:36:00|     Manhattan|2013-01-13 07:48:00|      Manhattan|               null|
|006313464EC98A24B...|2013-01-13 08:16:00|     Manhattan|2013-01-13 08:20:00|      Manhattan|2013-01-13 07:48:00|
|006313464EC98A24B...|2013-01-13 08:51:00|     Manhattan|2013-01-13 08:53:00|      Manhattan|2013-01-13 08:20:00|
|006313464EC98A24B...|2013-01-13 10:03:00|     Manhattan|2013-01-13 10:07:00|      Manhattan|2013-01-13 08:53:00|
|006313464EC98A24B...|2013-01-13 11:31:00|     Manhattan|2013-01-13 11:35:00|      Manhattan|2013-01-13 10:07:00|
|006313464EC98A24B...|2013-01-13 13:04:00|     Manhattan|2013-01-13 13:31:00|      Manha

                                                                                

In [27]:
# Calculating the idle time between the end of the previous ride and the start of the current ride
rides = rides.withColumn("taxi_idle_time", (unix_timestamp(rides.pickup_datetime) - unix_timestamp(rides.dropoff_LAG))/60)

rides.select('hack_license', 'pickup_datetime', 'pickup_borough', 'dropoff_datetime', 'dropoff_borough', 'dropoff_LAG', 'taxi_idle_time').limit(10).show()



+--------------------+-------------------+--------------+-------------------+---------------+-------------------+--------------+
|        hack_license|    pickup_datetime|pickup_borough|   dropoff_datetime|dropoff_borough|        dropoff_LAG|taxi_idle_time|
+--------------------+-------------------+--------------+-------------------+---------------+-------------------+--------------+
|006313464EC98A24B...|2013-01-13 07:36:00|     Manhattan|2013-01-13 07:48:00|      Manhattan|               null|          null|
|006313464EC98A24B...|2013-01-13 08:16:00|     Manhattan|2013-01-13 08:20:00|      Manhattan|2013-01-13 07:48:00|          28.0|
|006313464EC98A24B...|2013-01-13 08:51:00|     Manhattan|2013-01-13 08:53:00|      Manhattan|2013-01-13 08:20:00|          31.0|
|006313464EC98A24B...|2013-01-13 10:03:00|     Manhattan|2013-01-13 10:07:00|      Manhattan|2013-01-13 08:53:00|          70.0|
|006313464EC98A24B...|2013-01-13 11:31:00|     Manhattan|2013-01-13 11:35:00|      Manhattan|2013

                                                                                

In [28]:
# Calculating the average idle time per destination borough for each hack_license [avg_idle_time_per_destination_borough appreviated as AITPDB]
rides.groupBy('dropoff_borough').agg(F.avg('taxi_idle_time').alias('AITPDB')).show() # AITPDB = Average Idle Time Per Destination Borough in minutes



+---------------+------------------+
|dropoff_borough|            AITPDB|
+---------------+------------------+
|         Queens| 71.39703972209635|
|       Brooklyn| 79.60176863041289|
|  Staten Island|21.333333333333332|
|      Manhattan| 33.67569492003802|
|          Bronx| 93.73684210526316|
+---------------+------------------+



                                                                                

### TASK #2

In [29]:
# Count rides if the pickup and dropoff boroughs are the same

rides.filter(rides['pickup_borough'] == rides['dropoff_borough']).count()

                                                                                

86074

### TASK #3

In [30]:
# Count rides if started in one borough and ended in another borough

rides.filter(rides['pickup_borough'] != rides['dropoff_borough']).count()

                                                                                

11433