# SPBD Assignment 2

This notebook contains the code developed to implement the propoused solutions to this course assignment

Developed by:
    * Lucas Fischer, nº54659
    * Joana Martins, nº54707
    
    
# IMPORT NOTE:

The implemented code runs locally using the sample dataset.

## Getting the dataset

The dataset can be obtain by running the command bellow, or by uploading the file manualy to the work directory if the file is already present in your machine

# TODO ver como é o link para obter os dados

In [None]:
!curl -o yellow_tripdata_2018-01.csv https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-01.csv

## Creating result folders

This code removes (if already created) and creates new result folders

In [None]:
!rm -rf spark_rdd_results && mkdir spark_rdd_results

# Setting up dependencies

The first task we must complete is setting up the right dependencies for our solution

In [None]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark import SparkContext
import traceback
import datetime
from datetime import datetime as dt
import calendar
import time
import math
import operator

spark = SparkSession.builder.master('local[*]').appName('uberfy').getOrCreate()
sc = spark.sparkContext
filename = "./data/sorted_data.csv"

## Auxiliary functions

Functions created to help in the development of this assignment

In [None]:
def create_row(line):
    """
        Function that creates a structured tuple representing a row in a RDD

        Params:
            line - A line from the input file

        Rerturns:
            A Strcutured tuple with 14 positions
    """
    #Field - Array_position

    #pickup_dt - 0      fare_amount - 8
    #dropoff_dt - 1     tip_amount - 9
    #trip_time - 2      total_amount - 10
    #trip_distance - 3  pickup_cell - 11
    #pickup_long - 4    dropoff_cell - 12
    #pickup_lat - 5     taxi_id = 13
    #dropoff_long - 6
    #dropoff_lat - 7
    
    splitted_line = line.split(',')
    return (
        splitted_line[2], splitted_line[3], int(splitted_line[4]), float(splitted_line[5]), float(splitted_line[6]), \
        float(splitted_line[7]), float(splitted_line[8]), float(splitted_line[9]), float(splitted_line[11]), \
        float(splitted_line[14]), float(splitted_line[16]), estimate_cellid(float(splitted_line[7]), float(splitted_line[6])),\
        estimate_cellid(float(splitted_line[9]), float(splitted_line[8])), splitted_line[0]
    )


def create_row_df(line):
    """
        Function that creates a Structured Row object representing a Row in a DataFrame

        Params:
            line - A line from the input file

        Returns:
            A Row object representing a row in a Dataframe
    """
    #Field - Array_position

    #pickup_dt - 0      fare_amount - 8
    #dropoff_dt - 1     tip_amount - 9
    #trip_time - 2      total_amount - 10
    #trip_distance - 3  pickup_cell - 11
    #pickup_long - 4    dropoff_cell - 12
    #pickup_lat - 5     taxi_id = 13
    #dropoff_long - 6
    #dropoff_lat - 7
    
    splitted_line = line.split(',')
    return Row(
        pickup_dt = splitted_line[2], dropoff_dt = splitted_line[3], trip_time = int(splitted_line[4]), \
        trip_distance = float(splitted_line[5]), pickup_long = float(splitted_line[6]), pickup_lat = float(splitted_line[7]), \
        dropoff_long = float(splitted_line[8]), dropoff_lat = float(splitted_line[9]), fare_amount = float(splitted_line[11]), \
        tip_amount = float(splitted_line[14]), total_amount = float(splitted_line[16]), pickup_cell = estimate_cellid(float(splitted_line[7]), float(splitted_line[6])), \
        dropoff_cell = estimate_cellid(float(splitted_line[9]), float(splitted_line[8])), taxi_id = splitted_line[0]
        )   


def filter_lines(line):
    """
        Function that filters out empty lines as well as lines that have coordinates as 0.0000 (non relevant points)

        Params:
            line - A line from the input file

        Returns:
            True if the line passed this condition, False otherwise
    """
    splitted_line = line.split(',')
    return (len(line) > 0) and (float(splitted_line[6]) != 0) and (float(splitted_line[8]) != 0)



def estimate_cellid(lat, lon):
    """
        Function that estimates a cell ID given a latitude and longitude based on the coordinates of cell 1.1

        Params:
            lat - Input latitude for which to find the cellID
            lon - Input longitude for which to fin the cellID

        Returns:
            A String such as 'xxx.xxx' representing the ID of the cell
    """
    x0 = -74.913585 #longitude of cell 1.1
    y0 = 41.474937  #latitude of cell 1.1
    s = 500 #500 meters

    delta_x = 0.005986 / 500.0  #Change in longitude coordinates per meter
    delta_y = 0.004491556 /500.0    #Change in latitude coordinates per meter

    cell_x = 1 + math.floor((1/2) + (lon - x0)/(s * delta_x))
    cell_y = 1 + math.floor((1/2) + (y0 - lat)/(s * delta_y))
    
    return f"{cell_x}.{cell_y}"



def create_key_value(structured_tuple):
    """
        Function that from a structured tuple organizes it into a Key-Value formation.
        The key is a tuple containing both the weekday and the hour.
        The value is a dictionary containing only one item, this dictionary is to be merged on the reducer.

        Params:
            structured_tuple - A tuple representing a line of the input file

        Returns:
            A tuple organized into a Key-Value formation
    """

    weekday = convert_to_weekday(structured_tuple[0])
    hour = convert_to_hour(structured_tuple[0])
    route = f"{structured_tuple[11]}-{structured_tuple[12]}"

    return ((weekday, hour), {route: 1})



def custom_reducer(accum, elem):
    """
        Custom function to be used in reduceByKey.
        This function well merge dictionaries counting the number of times each time appears

        Params:
            accum - An accumulator dictionary
            elem - The dictionary of the current iteration

        Returns:
            The accumulator dictionary updated with information obtained by elem
    """

    #store the only existing item inside elem
    key, value = elem.popitem()
    
    if(key in accum): #If accum already has this key, then update its value
        accum[key] += value
    else:   #If accum does not have this key, add it
        accum[key] = value

    return accum



def convert_to_weekday(date):
    """
        Function that converts a date to weekday

        Params:
            date - Unix timestamp formatted date in string form

        Returns:
            A string with the weekday of the input date
    """
    date_obj = dt.strptime(date, '%Y-%m-%d %H:%M:%S')
    return (calendar.day_name[date_obj.weekday()]).lower()



def convert_to_hour(date):
    """
        Function that gets the hour from a date

        Params:
            date - Unix timestamp formatted date in string form

        Returns:
            The hour portion of the input date
    """
    return date[11:13]



def filter_outliers(structured_tuple):
    """
        Function that filters out outlier cells. Cells whos ID is above 300 are considered outliers since
        the grid only extends to cell 300.300

        Params:
            structured_tuple - A tuple containing information of a line in the RDD

        Returns:
            True if there are no outlier cells in the input tuple, False otherwise
    """
    pickup_cell_x , pickup_cell_y = structured_tuple[11].split(".")
    dropoff_cell_x , dropoff_cell_y = structured_tuple[12].split(".")
    return (float(pickup_cell_x) <= 300) and (float(pickup_cell_y) <= 300) and (float(dropoff_cell_x) <= 300) and (float(dropoff_cell_y) <= 300)

# Query 1

Falar aqui sobre a primeira query

In [None]:
try:
        
    #timestamp to mesure the time taken
    time_before = dt.now()

    #read csv file (change this to the full dataset instead of just the sample)
    raw_data = sc.textFile(filename)

    #Filtering out non empty lines and lines that have a pick up or drop off coordinates as 0
    non_empty_lines = raw_data.filter(lambda line: filter_lines(line))

    #Shapping the rdd rows
    fields = non_empty_lines.map(lambda line : create_row(line))

    # Filter out rows that have Cell ID's with 300 in them. They are considered as outliers (stated in http://debs.org/debs-2015-grand-challenge-taxi-trips/)
    filtered_rdd = fields.filter(lambda row: filter_outliers(row))

    # ((weekday, hour), {route})
    organized_lines = filtered_rdd.map(lambda line : create_key_value(line))

    #Group all values by its key, reducing them acording to custom_reducer
    grouped = organized_lines.reduceByKey(lambda accum, elem: custom_reducer(accum, elem))

    #Sort descendingly the dictionaries present in the values and take only the first 10 elements
    top_routes = grouped.mapValues(lambda route_dict: sorted(route_dict, key = route_dict.get, reverse = True)[:10])

    #Store the retrieved results
    top_routes.saveAsTextFile("spark_rdd_results/query1")

    for a in top_routes.take(2):
        print(a)

    time_after = dt.now()
    seconds = (time_after - time_before).total_seconds()
    print("Execution time {} seconds".format(seconds))


    # sc.stop()
except:
    traceback.print_exc()
    # sc.stop()

# Query 2

Falar aqui sobre a query 2

In [None]:
try:

    #timestamp to mesure the time taken
    time_before = dt.now()

    # convert_to_weekday_udf = udf(lambda pickup_date: convert_to_weekday(pickup_date), StringType())
    convert_to_weekday_udf = udf(lambda pickup_date: convert_to_weekday(pickup_date), StringType())
    convert_to_hour_udf = udf(lambda pickup_date: pickup_date[11:13], StringType())

    #read csv file (change this to the full dataset instead of just the sample)
    raw_data = sc.textFile(filename)

    #Filtering out non empty lines and lines that have a pick up or drop off coordinates as 0
    non_empty_lines = raw_data.filter(lambda line: filter_lines(line))

    #Shapping the rdd rows
    fields = non_empty_lines.map(lambda line : create_row_df(line))

    #Creating DataFrame
    lines_df = spark.createDataFrame(fields)

    # Filter out rows that have Cell ID's with 300 in them. They are considered as outliers (stated in http://debs.org/debs-2015-grand-challenge-taxi-trips/)
    filtered_df = lines_df.filter(~((lines_df.pickup_cell.rlike("3\d\d")) | (lines_df.dropoff_cell.rlike("3\d\d"))))

    # Get the dropoffs of the last 15 minutes for each cell
    # get the average of the fare
    profit_by_area_15min = filtered_df \
        .groupBy(window("dropoff_dt", "900 seconds"), convert_to_weekday_udf("pickup_dt").alias("weekday"), convert_to_hour_udf("pickup_dt").alias("hour"), "pickup_cell") \
        .agg(avg(filtered_df.fare_amount + filtered_df.tip_amount).alias("median_fare")) \
        .orderBy("median_fare", ascending = False) \
        .select("weekday", "hour", "pickup_cell")


    # empty_taxis = filtered_df \
    #     .groupBy(window("dropoff_dt", "900 seconds"), "dropoff_cell") \
    #     .agg(countDistinct("taxi_id").alias("empty_taxis")) \
    #     .select("dropoff_cell", "empty_taxis")

    profit_by_area_15min.show(2)
    
    profit_by_area_15min.rdd.map(lambda row: ((row.weekday, row.hour), row.pickup_cell)).saveAsTextFile("spark_rdd_results/query2")
    
    time_after = dt.now()
    seconds = (time_after - time_before).total_seconds()
    print("Execution time {} seconds".format(seconds))

    # sc.stop()
except:
    traceback.print_exc()
    # sc.stop()


# Query 3

Falar aqui sobre a query 3

In [None]:
schema = StructType([
        StructField("medallion", StringType()),
        StructField("hack_license", StringType()),
        StructField("pickup_datetime", StringType()),
        StructField("dropoff_datetime", StringType()),
        StructField("trip_time_in_secs", IntegerType()),
        StructField("trip_distance", FloatType()),
        StructField("pickup_longitude", FloatType()),
        StructField("pickup_latitude", FloatType()),
        StructField("dropoff_longitude", FloatType()),
        StructField("dropoff_latitude", FloatType()),
        StructField("payment_type", StringType()),
        StructField("fare_amount", FloatType()),
        StructField("surcharge", FloatType()),
        StructField("mta_tax", FloatType()),
        StructField("tip_amount", FloatType()),
        StructField("tolls_amount", FloatType()),
        StructField("total_amount", FloatType())
    ])

# Load and parse the data
data = spark.read.schema(schema).option("header", "false").csv("./data/sorted_data.csv")

#Define the target columns and output column
assembler = VectorAssembler(
    inputCols = ["pickup_latitude", "pickup_longitude"],
    outputCol = "features"
    )

#Transform the data according to the Assembler created above
data_prepared = assembler.transform(data)

#Class used to evaluate the clusters
evaluator = ClusteringEvaluator()


for i in [5, 31, 75]: #find other k values
    
    #Instanciate Kmeans class with the given K value
    kmeans = KMeans(k = i)

    #Fit the data
    model = kmeans.fit(data_prepared)

    #Make predictions on fitted data
    predictions = model.transform(data_prepared)

    #Evaluate clustering by computing Silhouettes score
    silhouette = evaluator.evaluate(predictions)

    #TODO get position of prototype with best silhouette score (probably going to be the last iteration)

    #The closer silhouette score is to 1 means the tighter the points of the same cluster are, and the farther they are from other clusters
    #This is optimal because it means that points will all be close to just one taxi stand (saving unecessary money to create another one)
    print(f"{i} -> {silhouette}")