<a href="https://colab.research.google.com/github/d-vinha/SPBD/blob/main/Projeto_SPBD_finalfinal.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# SPBD-23/24 Project Assignment
#### version 1.0 - 15/11 (Final)


## GROUP - Ana Carolina Condez (67145); Duarte Vinha (67175); Katarina Dyreby (67156)


The project scenario involves a dataset of taxi rides, collected December 2022, in the New York city area.

Each completed taxi ride corresponds to an event in the dataset. A ride comprises several items of information, including the pick-up and drop-off zones/regions within NY City, their respective timestamps, as well as information related to the payment and number of passengers reported by the driver. The full explanation of the available data is provided [here](https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf).

A table to convert zone identifiers into proper names is found [here](https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv).

The project assignment will comprise a set of queries. All must be solved using Spark SQL Dataframes API. One query **of your choice** needs to be solved twice more, using Spark Core (mandatory) and, either using the SQL flavor of SparkSQL or using MrJOB.


# Deadline
 + 8th December - 23h59
 + For each day late, a penalty of 0.5/20 grade points applies.

In [None]:
# @title Download Dataset and Taxi Zone Information
!wget -q -O taxirides.csv.gz https://shorturl.at/mzHKY
!wget -q -O taxi+_zone_lookup.csv https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv

In [None]:
#@title Install PySpark
!pip install --quiet pyspark

# Queries

## Q1 - Basic Statistics

Compute for each day of the week, the total number of rides, the average ride duration, cost and distance travelled.

+ We chose to resolve this question thrice, as requested, using Spark SQL Dataframes API, Spark SQL, and Spark Core.

The computation for each day of the week involves an initial step of extracting the weekday from the date and calculating the trip durations in seconds. Following this, the ride data is aggregated based on the day of the week, allowing for the calculation of average ride duration, cost, and distance traveled.

After aggregating the ride data according to the day of the week, the days are mapped, enabling a structured presentation of the data ordered by the day of the week.

In [None]:
# @title Basic Statistics using SparkSQL Dataframes API

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('NYCtaxis').getOrCreate()
sc = spark.sparkContext

try :

    #Read Data
    rides = spark.read.csv('taxirides.csv.gz', header=True, inferSchema=True)

    #Extracting the day of the week from pickup datetime
    rides = rides.withColumn('pickup_day_of_week', date_format('tpep_pickup_datetime', 'EEEE'))

    #Converting pickup and dropoff datetime to UNIX timestamp
    rides = rides.withColumn('tpep_pickup_datetime_unix', unix_timestamp('tpep_pickup_datetime'))
    rides = rides.withColumn('tpep_dropoff_datetime_unix', unix_timestamp('tpep_dropoff_datetime'))

    #Calculating ride duration in seconds
    rides = rides.withColumn('ride_duration', abs(col('tpep_dropoff_datetime_unix') - col('tpep_pickup_datetime_unix')))
    rides = rides.drop('tpep_pickup_datetime_unix','tpep_dropoff_datetime_unix')

    #Aggregating ride data based on day of the week
    solution_not_ord = rides.groupBy('pickup_day_of_week') \
        .agg(count('*').alias('total_rides'),
        (round(avg('ride_duration'), 4)).alias('avg_ride_duration'),
        (round(avg('total_amount'),4)).alias('avg_ride_cost'),
        (round(avg('trip_distance'),4)).alias('avg_ride_distance'))

    # Mapping weekdays to numerical values
    day_map = {'Sunday': 0, 'Monday': 1, 'Tuesday': 2, 'Wednesday': 3, 'Thursday': 4, 'Friday': 5, 'Saturday': 6}
    map_weekday = udf(lambda x: day_map[x], IntegerType())
    solution_not_ord = solution_not_ord.withColumn('weekday_order', map_weekday(col('pickup_day_of_week')))

    #Ordering the resulting DataFrame by weekday in ascending order
    solutionq1 = solution_not_ord.orderBy('weekday_order', ascending=True)
    solutionq1 = solutionq1.drop('weekday_order')


    #Displaying the schema and contents of the resulting DataFrame
    solutionq1.printSchema()
    solutionq1.show()

except Exception as err:
  print(err)
finally:
    sc.stop()


In [None]:
# @title Basic Statistics using SparkSQL
# Compute for each day of the week, the total number of rides, the average ride
# duration, cost and distance travelled.

from pyspark.sql import SparkSession
from pyspark.sql.functions import unix_timestamp, date_format

spark = SparkSession.builder.master('local[*]').appName('NYCtaxis').getOrCreate()
sc = spark.sparkContext

try:
    #Read the data
    rides = spark.read.csv(path="taxirides.csv.gz", header=True, inferSchema=True)

    #Create a temporary view
    rides.createOrReplaceTempView("RIDES")

    #SQL query to compute statistics for each day of the week
    result = spark.sql("""
        SELECT
            count(*) as total_rides,
            avg(total_amount) as avg_ride_cost,
            avg(trip_distance) as avg_ride_distance,
            avg((unix_timestamp(tpep_dropoff_datetime) - unix_timestamp(tpep_pickup_datetime)))  as avg_ride_duration,
            date_format(tpep_pickup_datetime, 'EEEE') as pickup_day_of_week,
            dayofweek(tpep_pickup_datetime) as pickup_day_num
        FROM rides
        GROUP BY pickup_day_of_week, pickup_day_num
        ORDER BY pickup_day_num
    """)

    #Create a temporary view to select collumns from
    result.createOrReplaceTempView("solutionq1")

    #Select only the collumns we want
    solutionq1 = spark.sql("""
        SELECT
           pickup_day_of_week, total_rides, avg_ride_duration, avg_ride_cost, avg_ride_distance
        FROM solutionq1
    """)


    #Displaying the result
    solutionq1.show()

except Exception as err:
    print(err)

finally:
    sc.stop()


In [None]:
# @title Basic Statistics using Spark Core
# Compute for each day of the week, the total number of rides, the average ride
# duration, cost and distance travelled.

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import datetime

spark = SparkSession.builder.master('local[*]').appName('NYCtaxis').getOrCreate()
sc = spark.sparkContext

try :

  #Load the Data
  rides = sc.textFile('taxirides.csv.gz')

  #Select the first line
  header = rides.first()

  #Remove the header from the RDD
  rides = rides.filter( lambda line: line != header)


  solutionq1 = rides.map( lambda line: line.split(','))\
                     .map( lambda a : ( a[1], a[2], a[4], a[16] ))\
                     .map( lambda kv: (datetime.strptime(kv[0], '%Y-%m-%d %H:%M:%S').strftime('%A'), ( 1 , datetime.strptime( kv[1], '%Y-%m-%d %H:%M:%S').timestamp() - datetime.strptime( kv[0], '%Y-%m-%d %H:%M:%S').timestamp() , float(kv[2]), float(kv[3]))))\
                     .reduceByKey( lambda a,b : ( a[0] + b[0], a[1] + b[1], a[2] + b[2], a[3] + b[3] ) )\
                     .map( lambda kv : ( kv[0], ( kv[1][0] , kv[1][1] / kv[1][0] , kv[1][3] / kv[1][0], kv[1][2] / kv[1][0] )))

  #Function to convert day of the week to a number for sorting later
  def day_of_week_to_number(day):
      days = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday']
      return days.index(day)


  #Sort the rides by day of the week and collect the results
  sorted_rides = solutionq1.sortBy(lambda x: day_of_week_to_number(x[0])).collect()


  #Print the sorted rides
  # for ride in sorted_rides:
  #     print(ride)
  for ride in sorted_rides:
    day_of_week, stats = ride
    print(f"Day of the week: {day_of_week}")
    print(f"Total Rides: {stats[0]}")
    print(f"Average Duration: {stats[1]:.2f} seconds")
    print(f"Average Fare Amount: ${stats[2]:.2f}")
    print(f"Average Ride Distance: {stats[3]:.2f} miles")
    print("\n")

except Exception as err:
  print(err)

finally:
  sc.stop()


## Q2 Top-5 New York **boroughs**

Compute the top-5 New York **boroughs** most popular zones for pick-ups and dropoffs, for the whole month and for each day of the week, separately.

This question asks for the top 5 boroughs and not locations.

As such, we started by dropping the unnecessary columns from our original dataframe ('rides') and creating 2 new columns - 'PUBorough' and 'DOBorough'. We did this by performing a left outer join with 'location_info' based on the condition that 'PULocationID' or 'DOLocationID' (respectively) match the 'LocationID' in that dataframe, including the column 'Borough' from the 'location_info' dataframe in a new resulting dataframe with the name 'PUBorough' or 'DOBorough', respectively.

After that, we needed to compute the day of the week for pick-up and drop-off for each ride (which might not always be the same - a small fraction of rides start in one day but end in another). For that, we used the 'dayofweek' SQL function and created 2 new column - 'DoW_PU' and 'DoW_DO', which contain an integer value representing the day of the week, with 1=Sunday and 7=Saturday.

With this preprocessed dataframe we were then able to compute and display the top-5 PU and DO boroughs for the whole month.

After that, we also calculated the top-5 PU and DO boroughs for each individual day of the week. To achieve this we first defined a mapping from day numbers to day names. After that, we aggregated and pivoted the data to find the top 5 pick-up and drop-off boroughs for each day of the week, and displayed the results in separate tables.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql import DataFrame

# Create a SparkSession
spark = SparkSession.builder.master('local[*]').appName('taxis_q2').getOrCreate()
sc = spark.sparkContext

try:
    # Load the Data
    rides = spark.read.csv('taxirides.csv.gz', header=True, inferSchema=True)
    location_info = spark.read.csv('taxi+_zone_lookup.csv', header=True, inferSchema=True)

     # Step 1 - Select only the relevant columns from 'rides'
    selected_columns = [
      "tpep_pickup_datetime",
      "tpep_dropoff_datetime",
      "PULocationID",
      "DOLocationID"
    ]

    rides_selected = rides.select(selected_columns)

    # Step 2 - Join the pick-up boroughs
    rides_with_pickup_borough = rides_selected.join(
      location_info.alias("pickup_location"),
      rides_selected.PULocationID == F.col("pickup_location.LocationID"),
      "left_outer"
    ).select(
      rides_selected["*"],
      F.col("pickup_location.Borough").alias("PUBorough")
    )

    # Step 3 - Join for drop-off boroughs
    rides_with_boroughs = rides_with_pickup_borough.join(
      location_info.alias("dropoff_location"),
      rides_with_pickup_borough.DOLocationID == F.col("dropoff_location.LocationID"),
      "left_outer"
    ).select(
      rides_with_pickup_borough["*"],
      F.col("dropoff_location.Borough").alias("DOBorough")
    )

    # Step 4 - Add the days of the week for pick-up and drop-off for each ride
    # The dayofweek function returns an integer corresponding to the day of the
    # week where Sunday = 1 and Saturday = 7
    rides_with_days = rides_with_boroughs.withColumn("DoW_PU",
                                                    F.dayofweek("tpep_pickup_datetime"))
    rides_with_days = rides_with_days.withColumn("DoW_DO",
                                                F.dayofweek("tpep_dropoff_datetime"))

    # Step 5 - Drop the original timestamp columns which are no longer needed
    rides_dow_boroughs = rides_with_days.drop(
        "tpep_pickup_datetime", "tpep_dropoff_datetime")

    # Step 6 - Compute the top 5 pick-up and drop-off boroughs for the whole month
    top5_PU_boroughs_month = rides_dow_boroughs.groupBy(
        "PUBorough").count().orderBy(F.desc("count")).limit(5)
    top5_DO_boroughs_month = rides_dow_boroughs.groupBy(
        "DOBorough").count().orderBy(F.desc("count")).limit(5)

    # Step 7 - Show these results for the whole month
    print('Top 5 pick-up boroughs and respective count for the month of December 2022:')
    top5_PU_boroughs_month.show()

    print('Top 5 drop-off boroughs and respective count for the month of December 2022:')
    top5_DO_boroughs_month.show()

    # Step 8 - Define a mapping from day numbers to names
    day_mapping = {1: "Sunday", 2: "Monday", 3: "Tuesday", 4: "Wednesday",
                  5: "Thursday", 6: "Friday", 7: "Saturday"}

    # Step 9 - Aggregate and pivot for pick-ups and use select to rename the
    # columns (so that we get the names of the days in the final result and not
    # a number)
    top_PU_boroughs = (
        rides_dow_boroughs
        .groupBy("PUBorough")
        .pivot("DoW_PU", range(1, 8))
        .agg(F.count("PULocationID"))
    )

    top_PU_boroughs = top_PU_boroughs.select(
      F.col("PUBorough"),
      *[F.col(str(day_number)).alias(
          day_name) for day_number, day_name in day_mapping.items()]
    ).orderBy(*[F.col(
        day_name).desc() for day_name in day_mapping.values()]).limit(5)

    # Step 10 - A repeat of step 9 but for drop-offs instead
    top_DO_boroughs = (
        rides_dow_boroughs
        .groupBy("DOBorough")
        .pivot("DoW_DO", range(1, 8))
        .agg(F.count("DOLocationID"))
    )

    top_DO_boroughs = top_DO_boroughs.select(
      F.col("DOBorough"),
      *[F.col(str(day_number)).alias(
          day_name) for day_number, day_name in day_mapping.items()]
    ).orderBy(*[F.col(
        day_name).desc() for day_name in day_mapping.values()]).limit(5)

    # Step 11 - Show the results for pick-ups and drop-offs for each day of the
    # week (in a single table for each case)
    print('Top 5 pick-up boroughs for each day of the week and respective count:')
    top_PU_boroughs.show()
    print('Top 5 drop-off boroughs for each day of the week and respective count:')
    top_DO_boroughs.show()

except Exception as err:
    print(err)

finally:
    sc.stop()

## Q3 - Compute a list of anomalous rides.

Anomalous rides are those that deviate, significantly, either in terms of cost or distance travelled, from rides that started and ended in the same zone.

To identify anomalous rides, we begin by initializing an empty list that will hold the resulting anomalous rides. Each ride is uniquely identified by assigning a unique identifier to all rows in the dataset.

A new dataframe is then created, computing the average and standard deviation of the trip distance and total amount for rides between specific pickup and drop-off locations. This information is merged with the initial dataframe, ensuring that each row now contains the average and standard deviation for trip distance and total amount concerning rides with the same pickup and drop-off locations.

Two z-scores are calculated—one for trip distance and another for the total amount. A threshold of 3 is chosen; any z-score above or below the absolute value of 3 is considered an outlier. Rides exhibiting z-scores beyond this threshold are marked as anomalous.

Finally, the identified anomalous rides are stored in the list created at the beginning.

In [None]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('NYCtaxis').getOrCreate()
sc = spark.sparkContext

try :
    # Make an empty list to later store anomalous rides
    anomalous_rides_id_list = []

    # Read the data
    rides = spark.read.csv('taxirides.csv.gz', header=True, inferSchema=True)

    # Assign unique IDs to each ride
    rides = rides.withColumn('ride_id', monotonically_increasing_id())

    # Calculate average and standard deviation of trip distance and total amount for rides
    rides_same_location = rides.select('total_amount','Trip_distance', 'PULocationID','DOLocationID')\
             .groupBy('PULocationID','DOLocationID')\
             .agg(avg('Trip_distance').alias('avg_Trip_distance'), avg('total_amount').alias('avg_total_amount'),stddev('Trip_distance').alias('stddev_Trip_distance'), stddev('total_amount').alias('stddev_total_amount'))

    # Join the ride data with average and standard deviation data
    rides_with_avg = rides.join(rides_same_location,['PULocationID', 'DOLocationID'],'left')


    # Calculate z-scores for trip distance and total amount
    rides_with_zscores = rides_with_avg.withColumn('Trip_distance_zscore',(col('Trip_distance') - col('avg_Trip_distance')) / col('stddev_Trip_distance')).withColumn('total_amount_zscore',(col('total_amount') - col('avg_total_amount')) / col('stddev_total_amount'))

    # Define threshold for z-score outliers
    threshold = 3

    # Select ride IDs of anomalous rides based on the threshold
    anomalous_rides = rides_with_zscores.filter(((col('total_amount_zscore') > threshold) & (col('trip_distance_zscore') > threshold))|((col('total_amount_zscore') < -threshold) & (col('trip_distance_zscore') < -threshold))|((col('total_amount_zscore') > threshold) & (col('trip_distance_zscore') < -threshold))|((col('total_amount_zscore') < -threshold) & (col('trip_distance_zscore') > threshold)))

    # Select only the anomalous ride_id collumn
    anomalous_rides_id = anomalous_rides.select('ride_id')

    # Append all anomalous ride ids to the original list
    for row in anomalous_rides_id.collect():
        anomalous_rides_id_list.append(row[0])

    # Print the list of anomalous ride IDs
    print(anomalous_rides_id_list)

except Exception as err:
  print(err)
  sc.stop()

## Q4 - Find out which zones tend to generate shorter rides and which generate longer rides.

 Consider a ride short or long, respectively, if it less or more than 30% than the average distance for rides that originate in that zone.

In order to determine which zones tend to generate shorter or longer rides, the process begins by computing the average distance for each 'PULocationID'. Two empty lists are initialized to hold the solutions for shorter and longer ride-generating zones.

Next, the newly calculated average distances are merged with the original ride dataframe, ensuring each row now includes the average distance specific to its 'PULocationID'. Then, a new dataframe is created to categorize the rides into three sections: Short, Medium, or Long. This classification is based on whether the ride distance is less or more than 30% of the average distance for rides originating in that zone.

Following this, the counts of short and long rides originating from each 'PULocationID' are computed. If a 'PULocationID' has more short rides than long ones, it tends to generate shorter rides; conversely, if it has more long rides, it tends to generate longer rides.

Finally, the identified 'PULocationID's that tend to generate short or long rides are stored in two separate lists.

In [None]:
# Consider a ride short or long, respectively, if it less or more than 30% than
# the average distance for rides that originate in that zone.

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('NYCtaxis').getOrCreate()
sc = spark.sparkContext

try :
    #Make two empty lists to later store the results
    long_rides = []
    short_rides = []

    #Read the data
    rides = spark.read.csv('taxirides.csv.gz', header=True, inferSchema=True)

    #Calculate average trip distance per PULocationID
    rides_pu_trip_distance = rides.select('PULocationID','trip_distance').groupBy('PULocationID').agg(avg('trip_distance').alias('avg_trip_distance'))

    #Join the average trip distance with the original rides dataframe
    rides_with_avg = rides.join(rides_pu_trip_distance,['PULocationID'],'left')

    rides_with_long_short = rides_with_avg.withColumn('ride_length',
                                                      when(col('trip_distance') < col('avg_trip_distance') * 0.7, 'Short')
                                                      .when(col('trip_distance') > col('avg_trip_distance') * 1.3, 'Long')
                                                      .otherwise('Medium'))

    #Determine if a ride is short, long, or medium based on average trip distance
    rides_with_long_short = rides_with_long_short.select('PULocationID', 'ride_length') \
                        .groupBy('PULocationID') \
                        .agg(sum(when(col('ride_length') == 'Short', 1).otherwise(0)).alias('count_short'),
                         sum(when(col('ride_length') == 'Long', 1).otherwise(0)).alias('count_long'))

    #Group by PULocationID to count short and long rides
    rides_with_long_short = rides_with_long_short.withColumn('tends_to_generate', when(col('count_short') > col('count_long'), 'Short').otherwise('Long'))
    rides_with_long_short = rides_with_long_short.select('PULocationID','tends_to_generate')

    #Collect results and appened them to the short_rides and long_rides lists
    for row in rides_with_long_short.collect():
        if row['tends_to_generate'] == 'Short':
            short_rides.append(row['PULocationID'])
        else:
            long_rides.append(row['PULocationID'])


except Exception as err:
  print(err)
  sc.stop()



In [None]:
# @title Q4 printing the results with zone name

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.master('local[*]').appName('NYCtaxis_Q4').getOrCreate()
sc = spark.sparkContext

try :
    # Make two empty lists to later store the results
    long_rides = []
    short_rides = []

    # Read the data
    rides = spark.read.csv('taxirides.csv.gz', header=True, inferSchema=True)
    location_info = spark.read.csv('taxi+_zone_lookup.csv', header=True,
                                   inferSchema=True)


    # Calculate average trip distance per PULocationID
    rides_pu_trip_distance = rides.select('PULocationID','trip_distance').groupBy('PULocationID').agg(avg('trip_distance').alias('avg_trip_distance'))

    # Join the average trip distance with the original rides dataframe
    rides_with_avg = rides.join(rides_pu_trip_distance,['PULocationID'],'left')

    rides_with_long_short = rides_with_avg.withColumn('ride_length',
                                                      when(col('trip_distance') < col('avg_trip_distance') * 0.7, 'Short')
                                                      .when(col('trip_distance') > col('avg_trip_distance') * 1.3, 'Long')
                                                      .otherwise('Medium'))

    # Determine if a ride is short, long, or medium based on average trip distance
    rides_with_long_short = rides_with_long_short.select('PULocationID', 'ride_length') \
                        .groupBy('PULocationID') \
                        .agg(sum(when(col('ride_length') == 'Short', 1).otherwise(0)).alias('count_short'),
                         sum(when(col('ride_length') == 'Long', 1).otherwise(0)).alias('count_long'))

    # Group by PULocationID to count short and long rides
    rides_with_long_short = rides_with_long_short.withColumn('tends_to_generate', when(col('count_short') > col('count_long'), 'Short').otherwise('Long'))
    rides_with_long_short = rides_with_long_short.select('PULocationID','tends_to_generate')

    # Join with location_info to get the Zone Name
    rides_with_zone = rides_with_long_short.join(location_info, rides_with_long_short['PULocationID'] == location_info['LocationID'])

    # Collect results and append them to the short_rides and long_rides lists
    sorted_results = rides_with_zone.sort('PULocationID').collect()

    for row in sorted_results:
        zone_name = row['Zone']
        if row['tends_to_generate'] == 'Short':
            short_rides.append(row['PULocationID'])
        else:
            long_rides.append(row['PULocationID'])
        print(f"Pick-up location with ID: {row['PULocationID']} ({zone_name}) tends to generate: {row['tends_to_generate']} rides")

except Exception as err:
    print(err)

finally:
    sc.stop()

## Q5 - Find most important city zones using the Pagerank metric
Consider the graph where locations/zones (vertices) are connected by the taxi rides (edges). Locations that have many incoming rides, ie., those that are the dropoff location for many rides, will tend to be important hubs (centers of activity) in the city. Use Pagerank to find these hubs.

To that end, to simplify the graph, do not consider rides that involve "Unknown" zones. Additionally, for each zone, only consider the rides that start in that zone and end in the top-5 destinations for that zone (This will remove the edges corresponding to (src-dst) zone pairs that are not very popular).

Use the [GraphFrames API](https://graphframes.github.io/graphframes/docs/_site/index.html) and check the example below for a simple PageRank computation.

In [None]:
# @title Install GraphFrames
!pip install --quiet graphframes

In order to find the most important zones of the city, according to the defined criteria, we start by filtering our DataFrame, eliminating the rows where either "PULocationID", "DOLocationID" or both have the IDs 264 or 265, as those are the codes corresponding to "Unknown" zones.

Next, we have to define our vertices. This involves selecting the relevant columns - the pick-up and drop-off location IDs ("PULocationID", "DOLocationID") from the filtered DataFrame.

After this, we need to obtain the top-5 destinations ("DOLocationID") for each pick-up zone ("PULocationID"). To do this, we defined a window specification to rank the drop-off locations based on the number of rides originating from each pick-up zone. Using this window specification, we then computed the top-5 destinations for each zone and created a DataFrame containing these high-frequency destinations.

Subsequently, we created our edges. To that aim, we filtered them using the top-destinations DataFrame, ensuring that only edges connecting zones with high-frequency destinations are retained. We also renamed the columns "PULocationID" and "DOLocationID" to "src" and "dst," respectively, conforming to the GraphFrame API convention.

Following this, we created a GraphFrame object using the filtered vertices and edges DataFrames.

Afterwards, we applied the PageRank algorithm to the GraphFrame, setting the reset probability to 0.15 (as is used in the original PageRank algorithm) and the maximum number of iterations to 50 (to set a cap if no convergence has been reach by that iteration).

Then, we retrieved our results, sorting them in descending order according to the PageRank score of each location. Seen as the locations in our original data (rides) are only represented by their respective "LocationID", and that data for correspondence to actual city boroughs and zones was actually available in a separate .csv file, we aggregated this information to our results, to make them more interpretable.

Finally, we also ranked the NYC Boroughs according to the computed mean PageRank of each borough (taking into account the number of LocationIDs that each borough encompasses) to also understand which boroughs are more important.

In [None]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from graphframes import *

import warnings
warnings.filterwarnings("ignore")

spark = SparkSession.builder.master('local[*]') \
        .config('spark.jars.packages',
                'graphframes:graphframes:0.8.3-spark3.5-s_2.12') \
        .appName('Graphframes NYC Taxi Rides').getOrCreate()
sc = spark.sparkContext

try:
  # Read the data
  rides = spark.read.csv('taxirides.csv.gz', header=True, inferSchema=True)
  location_info = spark.read.csv('taxi+_zone_lookup.csv',
                                 header=True, inferSchema=True)

  # Step 1 - Filter out rides involving "Unknown" zones, which have ids 264 or 265
  filtered_taxi_df = rides.filter(
    (F.col("PULocationID").isin([264, 265]) == False) &
    (F.col("DOLocationID").isin([264, 265]) == False)
)

  # Step 2 - Select relevant columns for vertices
  vertices = (
    filtered_taxi_df
    .selectExpr("PULocationID as id")
    .union(filtered_taxi_df.selectExpr("DOLocationID as id"))
    .distinct()
  )

  # Step 3 - Define a window specification for ranking
  window_spec = Window.partitionBy("PULocationID").orderBy(F.col(
      "count").desc())

  # Step 4 - Compute the top-5 destinations for each zone
  top_destinations = (
    filtered_taxi_df
    .groupBy("PULocationID", "DOLocationID")
    .count()
    .withColumn("rank", F.row_number().over(window_spec))
    .filter(F.col("rank") <= 5)
    .drop("count", "rank")
  )

  # Step 5 - Use the top-5 destinations to filter edges and rename columns to
  # 'src' and 'dst', to conform to the GraphFrames API convention
  edges = (
    filtered_taxi_df
    .join(top_destinations, ["PULocationID", "DOLocationID"], "inner")
    .select("PULocationID", "DOLocationID")
    .distinct()
    .withColumnRenamed("PULocationID", "src")
    .withColumnRenamed("DOLocationID", "dst")
  )

  # Step 6 - Create a Graph using GraphFrame
  graph = GraphFrame(vertices, edges)

  # Step 7 - Run the PageRank Algorithm
  pagerank_results = graph.pageRank(resetProbability=0.15, maxIter=50)

  # Step 8 - Join the PageRank results with the location information to get the
  # actual names of the Boroughs and Zones instead of only ids
  result_with_location_info = (
      pagerank_results.vertices
      .join(location_info, pagerank_results.vertices["id"] == location_info[
          "LocationID"], "left")
      .select(
          pagerank_results.vertices["id"],
          location_info["Borough"].alias("Borough"),
          location_info["Zone"].alias("Zone"),
          pagerank_results.vertices["pagerank"]
      )
  )

  # Step 9 - Get the final PageRank results with Borough and Zone information
  print('Most Important Locations of NYC according to PageRank\
  metric with Zone name and Borough:')
  result_with_location_info.orderBy("pagerank", ascending=False).show()

  # Step 10 - Find out also which are the most relevant Boroughs by mean
  # PageRank according to how many LocationIDs each Borough contains
  mean_pagerank_per_borough = (
      result_with_location_info
      .groupBy("Borough")
      .agg(
          F.sum("pagerank").alias("sum_pagerank"),
          F.countDistinct("id").alias("num_ids")
      )
      .withColumn("mean_pagerank", F.col("sum_pagerank") / F.col("num_ids"))
      .orderBy("mean_pagerank", ascending=False)
  )

  # Step 11 - View the mean PageRank results for boroughs
  print('Most Important Boroughs of NYC by mean PageRank metric:')
  mean_pagerank_per_borough.show()

except Exception as err:
  print(err)

finally:
  sc.stop()