# [CSC8101] Engineering for AI - 2024 Spark Coursework

## Coursework overview

### Inputs

- **NYC Taxi Trips dataset** - list of recorded taxi trips, each with several characteristics, namely: distance, number of passengers, origin zone, destination zone and trip cost (total amount charged to customer).
- **NYC Zones dataset** - list of zones wherein trips can originate/terminate.

### Tasks

1. Data cleaning
  1. Remove "0 distance" and 'no passengers' records.
  2. Remove outlier records. 
2. Add new columns
  1. Join with zones dataset
  2. Compute the unit profitability of each trip
3. Zone summarisation and ranking
  1. Summarise trip data per zone
  2. Obtain the top 10 ranks according to:
    1. The total trip volume
    2. Their average profitabilitiy
    3. The total passenger volume
4. Record the total and task-specific execution times for each dataset size and format.

### How to

###### Code structure and implementation

- You must implement your solution to each task in the provided function code skeleton.
- The task-specific functions are combined together to form the full pipeline code, executed last (do not modify this code).
- Before implementing the specified function skeleton, you should develop and test your solution on separate code cells (create and destroy cells as needed).

###### Development

- Develop an initial working solution for the 'S' dataset and only then optimise it for larger dataset sizes.
- To perform vectorised operations on a DataFrame:
  - use the API docs to look for existing vectorised functions in: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html
  - actions to get around the lazy execution of spark: 
  https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions
  - if a customised function is required (e.g. to add a new column based on a linear combination of other columns), implement your own User Defined Function (UDF). See:  https://spark.apache.org/docs/latest/sql-ref-functions-udf-scalar.html
- Use only the `pyspark.sql` API - documentation link below - (note that searching through the docs returns results from the `pyspark.sql` API together with the `pyspark.pandas` API):
  - https://spark.apache.org/docs/3.2.0/api/python/reference/pyspark.sql.html
- Periodically download your notebook to your computer as backup and safety measure against accidental file deletion.
 
###### Execution time measurement

- Execution time is calculated and returned by the Spark Engine and shown in the output region of the cell.
- To measure the execution time of a task you must perform a `collect` or similar operation (e.g. `take`) on the returned DataFrame.

## Task 0 - Read data

The code below is ready to run. **Do not modify this code**. It does the following:

- Reads the 'zones' dataset into variable 'zone_names'
- Defines the `init_trips` function that allows you to read the 'trips' dataset (from the DBFS FileStore) given the dataset size ('S' to 'XXL') and format ('parquet' or 'delta') as function arguments
- Defines the `pipeline` function, called in Task 4 to measure the execution time of the entire data processing pipeline
- Shows you how to call the `init_trips` function and display dataset characteristics (number of rows, schema)

In [0]:
## global imports
import pyspark.sql as ps
import pyspark.sql.functions as pf
import pandas as pd

# Load zone names dataset - (much faster to read small file from git than dbfs)
zones_file_url = 'https://raw.githubusercontent.com/mutazb999/CSC8101-lab-and-coursework/main/02-assignment-spark/taxi_zone_names.csv'
zone_names = spark.createDataFrame(pd.read_csv(zones_file_url))

# Function to load trips dataset by selected dataset size
def init_trips(size = 'S', data_format = "parquet", taxi_folder = "/FileStore/tables/taxi"):     
    
    files = {
        'S'  : ['2021_07'],
        'M'  : ['2021'],
        'L'  : ['2020_21'],
        'XL' : ['1_6_2019', '7_12_2019'],
        'XXL': ['1_6_2019', '7_12_2019', '2020_21']
    }
    
    # validate input dataset size
    if size not in files.keys():
        print("Invalid input dataset size. Must be one of {}".format(list(files.keys())))
        return None               
    
    if data_format == "parquet":
        filenames = list(map(lambda s: f'{taxi_folder}/parquet/tripdata_{s}.parquet', files[size]))
        trips_df = spark.read.parquet(filenames[0])
        
        for name in filenames[1:]:
            trips_df = trips_df.union(spark.read.parquet(name))
            
    elif data_format == "delta":
        filenames = f"{taxi_folder}/delta/taxi-{size}-delta/"
        trips_df = spark.read.format("delta").load(filenames)
    
    else:
        print("Invalid data format. Must be one of {}".format(['parquet', 'delta']))
        return None
        
    print(
    """
    Trips dataset loaded!
    ---
      Size: {s}
      Format: {f}
      Tables loaded: {ds}
      Number of trips (dataset rows): {tc:,}
    """.format(s = size, f = data_format, ds = filenames, tc = trips_df.count()))
    
    return trips_df

# helper function to print dataset row count
def print_count(df):
    print("Row count: {t:,}".format(t = df.count()))

def pipeline(trips_df, with_task_12 = False, zones_df = zone_names):
    # Do not edit
    #---

    ## Task 1.1
    _trips_11 = t11_remove_zeros(trips_df)

    ## Task 1.2
    if with_task_12:
        _trips_12 = t12_remove_outliers(_trips_11)
    else:
        _trips_12 = _trips_11

    ## Task 2.1
    _trips_21 = t21_join_zones(_trips_12, zones_df = zone_names)

    ## Task 2.2
    _trips_22 = t22_calc_profit(_trips_21)

    ## Task 3.1
    _graph = t31_summarise_trips(_trips_22)

    ## Task 3.2
    _zones = t32_summarise_zones_pairs(_graph)

    _top10_trips     = t32_top10_trips(_zones)
    _top10_profit    = t32_top10_profit(_zones)
    _top10_passenger = t32_top10_passenger(_zones)
    
    return([_top10_trips, _top10_profit, _top10_passenger])

In [0]:
# CHANGE the value of argument 'size' to record the pipeline execution times for increasing dataset sizes
SIZE = 'S'
DATA_FORMAT = 'parquet'

# Load trips dataset
trips = init_trips(SIZE, DATA_FORMAT)

# uncomment line only for small datasets
# trips.take(1)


    Trips dataset loaded!
    ---
      Size: S
      Format: parquet
      Tables loaded: ['/FileStore/tables/taxi/parquet/tripdata_2021_07.parquet']
      Number of trips (dataset rows): 2,898,033
    


In [0]:
print_count(trips)

Row count: 2,898,033


In [0]:
# dataset schemas
trips.printSchema()

root
 |-- index: long (nullable = true)
 |-- VendorID: double (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- cab_type: string (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |--

In [0]:
display(trips[['PULocationID', 'DOLocationID', 'trip_distance', 'passenger_count', 'total_amount']].take(5))

PULocationID,DOLocationID,trip_distance,passenger_count,total_amount
90,68,0.8,1.0,8.8
113,90,0.9,1.0,8.8
88,232,2.8,1.0,13.8
79,249,1.4,1.0,12.3
142,238,2.0,0.0,12.3


In [0]:
zone_names.printSchema()

root
 |-- LocationID: long (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [0]:
display(zone_names.take(5))

LocationID,Borough,Zone,service_zone
1,EWR,Newark Airport,EWR
2,Queens,Jamaica Bay,Boro Zone
3,Bronx,Allerton/Pelham Gardens,Boro Zone
4,Manhattan,Alphabet City,Yellow Zone
5,Staten Island,Arden Heights,Boro Zone


## Task 1 - Filter rows

**Input:** trips dataset

### Task 1.1 - Remove "0 distance" and 'no passengers' records

Remove dataset rows that represent invalid trips:

- Trips where `trip_distance == 0` (no distance travelled)
- Trips where `passenger_count == 0` and `total_amount == 0` (we want to retain records where `total_amount` > 0 - these may be significant as the taxi may have carried some parcel, for example)

Altogether, a record is removed if it satisfies the following conditions:

`trip_distance == 0` or `(passenger_count == 0` and `total_amount == 0)`.

**Recommended:** Select only the relevant dataset columns for this and subsequent tasks: `['PULocationID', 'DOLocationID', 'trip_distance', 'passenger_count', 'total_amount')]`

### Task 1.2 - Remove outliers using the modified z-score

Despite having removed spurious "zero passengers" trips in task 1.1, columns `total_amount` and `trip_distance` contain additional outlier values that must be identified and removed.

To identify and remove outliers, you will use the modified [z-score](https://en.wikipedia.org/wiki/Standard_score) method.
The modified z-score uses the median and [Median Absolute Deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) (MAD), instead of the mean and standard deviation, to determine how far an observation (indexed by i) is from the mean:

$$z_i = \frac{x_i - \mathit{median}(\mathbf{x})}{\mathbf{MAD}},$$

where x represents the input vector, xi is an element of x and zi is its corresponding z-score. In turn, the MAD formula is:

$$\mathbf{MAD} = 1.483 * \mathit{median}(\big\lvert x_i - \mathit{median}(\mathbf{x})\big\rvert).$$

Observations with **high** (absolute) z-score are considered outlier observations. A score is considered **high** if its __absolute z-score__ is larger than a threshold T = 3.5:

$$\big\lvert z_i \big\rvert > 3.5.$$

where T represents the number of unit standard deviations beyond which a score is considered an outlier ([wiki](https://en.wikipedia.org/wiki/68%E2%80%9395%E2%80%9399.7_rule)).

This process is repeated twice, once for each of the columns `total_amount` and `trip_distance` (in any order).

**Important:** Use the surrogate function [`percentile_approx`](https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.functions.percentile_approx.html?highlight=percentile#pyspark.sql.functions.percentile_approx) to estimate the median (calculating the median values for a column is expensive as it cannot be parallelised efficiently).

In [0]:
# develop your solution here (create/destroy cells as needed) and then implement it in the functions below

In [0]:
# Your solution implementation to task 1.1 goes HERE

# Logic
# Trip Distance Check: The condition col("trip_distance") != 0 ensures that we exclude trips where the recorded distance is zero. Trips with zero distance are either data errors or represent trips that were booked but not actually taken.

# Passenger Count and Total Amount Check: The condition ~((col("passenger_count") == 0) & (col("total_amount") == 0)) serves to remove records where there were no passengers (passenger_count == 0) and no charge (total_amount == 0). However, it's important to retain records where total_amount > 0 even if passenger_count == 0, as these might represent valid scenarios like parcel deliveries.

from pyspark.sql.functions import col

def t11_remove_zeros(df):
    # Filter out rows where trip distance is 0 or both passenger count and total amount are 0
    return df.filter((col("trip_distance") != 0) & ~((col("passenger_count") == 0) & (col("total_amount") == 0)))


In [0]:
# execute task 1.1
trips_11 = t11_remove_zeros(trips)

print_count(trips_11)

## uncomment only for smaller datasets
# display(trips_11.take(10))

Row count: 2,858,164


In [0]:
# Your solution implementation to task 1.2 goes HERE

# Logic

# Median and MAD Calculation:
# The nested function calculate_mad computes the Median Absolute Deviation (MAD) for a specified column. MAD is a measure of variability that is more robust to outliers than standard deviation.
# The median and MAD are calculated for the total_amount and trip_distance columns. The MAD is scaled by a factor of 1.483 to make it consistent with the standard deviation for a normal distribution (This number was given in the instructions).

# Modified Z-Score Calculation:
# The modified Z-score for each data point in the specified columns is calculated. It measures how far (in terms of MAD) a data point is from the median.

# Filtering Outliers:
# Data points with an absolute modified Z-score greater than the threshold (3.5 in this case) are considered outliers and are filtered out.
# The threshold of 3.5 is a typical choice for identifying outliers.

from pyspark.sql.functions import abs

def t12_remove_outliers(df):
    
    def calculate_mad(df, column_name):
        # Calculate the median of the specified column
        median = df.approxQuantile(column_name, [0.5], 0.01)[0]

        # Calculate the MAD and apply scaling factor
        mad = df.withColumn('abs_diff', abs(col(column_name) - median)) \
                .approxQuantile('abs_diff', [0.5], 0.01)[0]
        mad_scaled = 1.483 * mad  # Apply the scaling factor
        return median, mad_scaled

    # Define the threshold for the modified Z-score
    threshold = 3.5

    # Iterate over each column to remove outliers
    for column in ['total_amount', 'trip_distance']:
        # Calculate median and scaled MAD for the column
        median, mad_scaled = calculate_mad(df, column)

        # Calculate modified Z-Score and filter the DataFrame
        z_score_expr = (col(column) - median) / mad_scaled
        df = df.filter(abs(z_score_expr) <= threshold)

    return df

In [0]:
# execute task 1.2
trips_12 = t12_remove_outliers(trips_11)

print_count(trips_12)
# display(trips_12.limit(10))

Row count: 2,435,246


## Task 2 - Compute new columns

### Task 2.1 - Zone names

Obtain the **start** and **end** zone names of each trip by joining the `trips` and `zone_names` datasets (i.e. by using the `zone_names` dataset as lookup table).

**Note:** The columns containing the start and end zone ids of each trip are named `PULocationID` and `DOLocationID`, respectively.

### Task 2.2 - Unit profitability

Compute the column `unit_profitability = total_amount / trip_distance`.

In [0]:
# develop your solution here (create/destroy cells as needed) and then implement it in the functions below
# Review the schema of the trips DataFrame
trips_12.printSchema()

# Review the schema of the zone_names DataFrame
zone_names.printSchema()

root
 |-- index: long (nullable = true)
 |-- VendorID: double (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- cab_type: string (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |--

In [0]:
# Your solution implementation to task 2.1 goes HERE

# Logic 

# Initial Join for Pickup Locations:
# The trips dataset (trips_df) is joined with the zone names dataset (zones_df) on the PULocationID column. This join operation maps each pickup location ID to its corresponding zone name.
# The Zone and Borough columns from zones_df are renamed to PUZone and PUBorough respectively, to explicitly indicate that these refer to the pickup locations. The join is a left join, ensuring that all records in trips_df are retained, even if no matching zone name is found.

#Subsequent Join for Dropoff Locations:
# A second join is performed on the result of the first join, this time using the DOLocationID column. This associates each dropoff location ID with its zone name.
# Again, the Zone and Borough columns are renamed, now to DOZone and DOBorough, to denote dropoff locations. As with the first join, a left join is used to maintain all trip records.

# Handling Duplicate Columns:
# After each join, the function drops the LocationID and service_zone columns that come from zones_df to avoid duplicate and unnecessary information in the resulting DataFrame.

def t21_join_zones(trips_df, zones_df=zone_names):
    # Perform the join for pickup locations
    trips_with_pu = trips_df.join(zones_df, trips_df.PULocationID == zones_df.LocationID, "left") \
                            .withColumnRenamed("Zone", "PUZone") \
                            .withColumnRenamed("Borough", "PUBorough") \
                            .drop("LocationID", "service_zone")  # Drop duplicate columns after join

    # Perform the join for dropoff locations
    trips_with_pu_do = trips_with_pu.join(zones_df, trips_with_pu.DOLocationID == zones_df.LocationID, "left") \
                                    .withColumnRenamed("Zone", "DOZone") \
                                    .withColumnRenamed("Borough", "DOBorough") \
                                    .drop("LocationID", "service_zone")  # Drop duplicate columns after join

    return trips_with_pu_do


In [0]:
# execute task 2.1
trips_21 = t21_join_zones(trips_12, zones_df = zone_names)

print_count(trips_21)
#display(trips_21.take(10))

Row count: 2,435,246


In [0]:
# Your solution implementation to task 2.2 goes HERE

# Logic

# The logic behind this is that the  unit profitability is calculated as the ratio of total_amount to trip_distance. This ratio represents the average amount of money earned per unit of distance traveled. The formula has been given to us in the instructions therefore the implementation is straightforward.

def t22_calc_profit(df):
    # Add the new column 'unit_profitability'
    df = df.withColumn("unit_profitability", col("total_amount") / col("trip_distance"))
    return df

In [0]:
# execute task 2.2
trips_22 = t22_calc_profit(trips_21)

print_count(trips_22)
# display(trips_22.take(10))

Row count: 2,435,246


In [0]:
# Display the first 10 rows of the DataFrame
trips_22.show(10)

+-----+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+--------+--------------------+---------------------+---------+---------+---------+--------------------+---------+--------------------+------------------+
|index|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|cab_type|lpep_pickup_datetime|lpep_dropoff_datetime|ehail_fee|trip_type|PUBorough|              PUZone|DOBorough|              DOZone|unit_profitability|
+-----+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+------

## Task 3: Rank zones by traffic, passenger volume and profitability

### 3.1 - Summarise interzonal travel

Build a graph data structure of zone-to-zone traffic, representing aggregated data about trips between any two zones. The graph will have one node for each zone and one edge connecting each pair of zones. In addition, edges contain aggregate information about all trips between those zones. 

For example, zones Z1 and Z2 are connected by *two* edges: edge Z1 --> Z2 carries aggregate data about all trips that originated in Z1 and ended in Z2, and edge Z2 --> Z1 carries aggregate data about all trips that originated in Z2 and ended in Z1.

The aggregate information of interzonal travel must include the following data:

- `average_unit_profit` - the average unit profitability (calculated as `mean(unit_profitability)`).
- `trips_count` -- the total number of recorded trips.
- `total_passengers` -- the total number of passenger across all trips (sum of `passenger_count`).

This graph can be represented as a new dataframe, with schema:

\[`PULocationID`, `DOLocationID`, `average_unit_profit`, `trips_count`, `total_passengers` \]

__hint__: the `groupby()` operator produces a `pyspark.sql.GroupedData` structure. You can then calculate multiple aggregations from this using `pyspark.sql.GroupedData.agg()`: 
- https://spark.apache.org/docs/3.2.0/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.groupby.html
- https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.GroupedData.agg.html

### Task 3.2 - Obtain top-10 zones

For each of the following measures, report the top-10 zones _using their plain names you dereferenced in the previous step, not the codes_. Note that this requires ranking the nodes in different orders. Specifically, you need to calculate the following further aggregations:

- the **total** number of trips originating from Z. This is simply the sum of `trips_count` over all outgoing edges for Z, i.e., edges of the form Z -> \*
- the **average** profitability of a zone. This is the average of all `average_unit_profit` over all *outgoing* edges from Z.
- The **total** passenger volume measured as the **sum** of `total_passengers` carried in trips that originate from Z

In [0]:
# develop your solution here (create/destroy cells as needed) and then implement it in the functions below
from pyspark.sql import functions as F

In [0]:
## Your solution to task 3.1 goes HERE

# Logic

# Grouping by Pickup and Dropoff Locations:
# The function groups the data by PULocationID and DOLocationID. Each group represents a unique pair of zones, corresponding to the start and end points of trips.
# This grouping effectively creates the edges of the graph, where each edge connects a pickup location (node) to a dropoff location (node).

# Aggregating Key Metrics:
# For each pair of zones, the following metrics are aggregated:
    # F.mean("unit_profitability").alias("average_unit_profit"): Calculates the average unit profitability of trips between each pair of zones. This metric helps understand the financial efficiency of trips on each route.
    # F.count("*").alias("trips_count"): Counts the total number of trips that occurred between each pair of zones. This gives an idea of the traffic volume or popularity of each route.
    # F.sum("passenger_count").alias("total_passengers"): Sums the total number of passengers for all trips between each pair of zones. This indicates the overall passenger load on each route.

# Resulting DataFrame - graph_df:
# The resulting DataFrame graph_df contains columns for PULocationID, DOLocationID, average_unit_profit, trips_count, and total_passengers.
# Each row in graph_df provides a comprehensive snapshot of the traffic, financial efficiency, and passenger volume for trips between each unique pair of zones.

def t31_summarise_trips(df):
    # Aggregate data for trips between zones
    graph_df = df.groupBy("PULocationID", "DOLocationID").agg(
        F.mean("unit_profitability").alias("average_unit_profit"),
        F.count("*").alias("trips_count"),
        F.sum("passenger_count").alias("total_passengers")
    )

    return graph_df

In [0]:
# execute task 3.1
graph = t31_summarise_trips(trips_22)

print_count(graph)
# display(graph.take(10))

Row count: 11,833


In [0]:
display(graph.take(10))

PULocationID,DOLocationID,average_unit_profit,trips_count,total_passengers
228,26,7.41471058428216,8,2.0
108,26,6.236111111111111,1,1.0
25,26,4.722645989088705,2,3.0
21,29,8.13479934900303,9,1.0
97,26,4.504891363110619,9,10.0
22,26,15.249447857883409,32,2.0
178,26,9.437984319433491,9,3.0
26,26,19.89301966792171,86,11.0
181,26,6.467595978916211,11,6.0
87,29,6.241486068111455,1,1.0


In [0]:
# Your solution to task 3.2 goes HERE (implement each of the functions below)

# Logic 

# Function t32_summarise_zones_pairs:
# Currently, this function simply returns the input DataFrame df. If additional summarization or processing is required, it should be implemented here. Otherwise, this function may not be necessary.

# Function t32_top10_trips (Top 10 Zones by Trip Volume):
# Aggregates the total number of trips originating from each zone (PULocationID).
# Ranks the zones by the aggregated total_trips in descending order and selects the top 10.
# Joins the resulting DataFrame with zone_names to associate each PULocationID with its readable zone name.

# Function t32_top10_profit (Top 10 Zones by Profit):
# Calculates the average profitability (average_unit_profit) for each originating zone.
# Orders the zones by average_profit in descending order and picks the top 10.
# Joins the resulting DataFrame with zone_names for readable zone names.

# Function t32_top10_passenger (Top 10 Zones by Passenger Volume):
# Aggregates the total passenger volume (total_passengers) for each originating zone.
# Sorts the zones by total_passengers in descending order and selects the top 10.
# Performs a join with zone_names to get the readable names of the zones.

def t32_summarise_zones_pairs(df, zones_df = zone_names):
    return df

# Top 10 ranked zones by traffic (trip volume)
def t32_top10_trips(df_zones):
    top_zones_by_trips = df_zones.groupBy("PULocationID").agg(
        F.sum("trips_count").alias("total_trips")
    ).orderBy(F.desc("total_trips")).limit(10)

    return top_zones_by_trips.join(zone_names, top_zones_by_trips.PULocationID == zone_names.LocationID, "left")


# Top 10 ranked zones by profit
def t32_top10_profit(df_zones):
    top_zones_by_profit = df_zones.groupBy("PULocationID").agg(
        F.mean("average_unit_profit").alias("average_profit")
    ).orderBy(F.desc("average_profit")).limit(10)

    return top_zones_by_profit.join(zone_names, top_zones_by_profit.PULocationID == zone_names.LocationID, "left")


# Top 10 ranked zones by passenger volume
def t32_top10_passenger(df_zones):
    top_zones_by_passengers = df_zones.groupBy("PULocationID").agg(
        F.sum("total_passengers").alias("total_passengers")
    ).orderBy(F.desc("total_passengers")).limit(10)

    return top_zones_by_passengers.join(zone_names, top_zones_by_passengers.PULocationID == zone_names.LocationID, "left")

In [0]:
# execute task 3.2
zones = t32_summarise_zones_pairs(graph)

top10_trips     = t32_top10_trips(zones)
top10_profit    = t32_top10_profit(zones)
top10_passenger = t32_top10_passenger(zones)

In [0]:
# use 'display()' or return a pandas DataFrame for 'pretty' output
display(top10_trips)

PULocationID,total_trips,LocationID,Borough,Zone,service_zone
48,73585,48,Manhattan,Clinton East,Yellow Zone
141,73531,141,Manhattan,Lenox Hill West,Yellow Zone
142,80200,142,Manhattan,Lincoln Square East,Yellow Zone
161,92794,161,Manhattan,Midtown Center,Yellow Zone
162,84103,162,Manhattan,Midtown East,Yellow Zone
170,87410,170,Manhattan,Murray Hill,Yellow Zone
186,95864,186,Manhattan,Penn Station/Madison Sq West,Yellow Zone
236,102930,236,Manhattan,Upper East Side North,Yellow Zone
237,121281,237,Manhattan,Upper East Side South,Yellow Zone
239,73739,239,Manhattan,Upper West Side South,Yellow Zone


In [0]:
# use 'display()' return a pandas DataFrame for 'pretty' output
display(top10_profit)

PULocationID,average_profit,LocationID,Borough,Zone,service_zone
10,106.87455960856856,10,Queens,Baisley Park,Boro Zone
15,373.64352610622007,15,Queens,Bay Terrace/Fort Totten,Boro Zone
59,36.84437137879828,59,Bronx,Crotona Park,Boro Zone
81,35.018824815920645,81,Bronx,Eastchester,Boro Zone
89,34.99531722605364,89,Brooklyn,Flatbush/Ditmas Park,Boro Zone
101,48.41059594606823,101,Queens,Glen Oaks,Boro Zone
115,38.65,115,Staten Island,Grymes Hill/Clifton,Boro Zone
190,95.73464655984552,190,Brooklyn,Prospect Park,Boro Zone
207,103.12501378067476,207,Queens,Saint Michaels Cemetery/Woodside,Boro Zone
241,59.39185912313199,241,Bronx,Van Cortlandt Village,Boro Zone


In [0]:
# use 'display()' or return a pandas DataFrame for 'pretty' output
display(top10_passenger)

PULocationID,total_passengers,LocationID,Borough,Zone,service_zone
48,105804.0,48,Manhattan,Clinton East,Yellow Zone
142,113689.0,142,Manhattan,Lincoln Square East,Yellow Zone
161,133614.0,161,Manhattan,Midtown Center,Yellow Zone
162,118328.0,162,Manhattan,Midtown East,Yellow Zone
170,122448.0,170,Manhattan,Murray Hill,Yellow Zone
186,135960.0,186,Manhattan,Penn Station/Madison Sq West,Yellow Zone
234,104345.0,234,Manhattan,Union Sq,Yellow Zone
236,142914.0,236,Manhattan,Upper East Side North,Yellow Zone
237,169119.0,237,Manhattan,Upper East Side South,Yellow Zone
239,104464.0,239,Manhattan,Upper West Side South,Yellow Zone


## Task 4 - Record the pipeline's execution time

Record the execution time of:

1. the whole pipeline
2. the whole pipeline except task 1.2

on the two tables below, for all dataset sizes: `'S'`, `'M'`, `'L'`, `'XL'`, `'XXL'`, and data formats: `parquet` and `delta`.

Analyse the resulting execution times and comment on the effect of dataset size, dataset format and task complexity (with and without task 1.2) on pipeline performance.

In [0]:
# CHANGE the value of the following arguments to record the pipeline execution times for increasing dataset sizes
SIZE = 'XXL'
DATA_FORMAT = 'delta'
WITH_TASK_12 = False

# Load trips dataset
trips = init_trips(SIZE, DATA_FORMAT)


    Trips dataset loaded!
    ---
      Size: XXL
      Format: delta
      Tables loaded: /FileStore/tables/taxi/delta/taxi-XXL-delta/
      Number of trips (dataset rows): 132,396,785
    


In [0]:
# run and record the resulting execution time shown by databricks (on the cell footer)

# IMPORTANT: this function calls all task functions in order of occurrence. For this code to run without errors, you have to load into memory all of the previous task-specific functions, even if you haven't implemented these yet.
pipeline(trips, with_task_12 = WITH_TASK_12)

[DataFrame[PULocationID: bigint, total_trips: bigint, LocationID: bigint, Borough: string, Zone: string, service_zone: string],
 DataFrame[PULocationID: bigint, average_profit: double, LocationID: bigint, Borough: string, Zone: string, service_zone: string],
 DataFrame[PULocationID: bigint, total_passengers: double, LocationID: bigint, Borough: string, Zone: string, service_zone: string]]

_Table 1. Pipeline performance for `parquet` format._

| metric                      | S    | M    | L    | XL   | XXL  |
|-----------------------------|------|------|------|------|------|
| rows (M)                    |  2.898033 |  15.571166 |  41.953716 |  90.443069 |  132.396785 |
| execution time   (w/o 1.2)  | 0.36 sec | 0.26 sec | 0.35 sec | 0.33 sec | 0.27 sec |
| execution time              | 3.20 sec | 11.18 sec | 27.66 sec | 31.32 sec | 29.98 sec |
| sec / 1M records (w/o 1.2)  | 0.124  | 0.017  | 0.008  | 0.004  | 0.002 |
| sec / 1M records            | 1.104  | 0.718  | 0.659  | 0.346  | 0.226  |

_Table 2. Pipeline performance for `delta` format._

| metric                      | S    | M    | L    | XL   | XXL  |
|-----------------------------|------|------|------|------|------|
| rows (M)                    |  2.898033 |  15.571166	 |  41.953716 |  90.443069 |  132.396785 |
| execution time   (w/o 1.2)  | 0.30 sec | 0.34 sec | 0.27 sec | 0.30 sec | 0.25 sec |
| execution time              | 3.54 sec | 4.97 sec | 7.22 sec | 14.88 sec | 19.89 sec |
| sec / 1M records (w/o 1.2)  | 0.104  | 0.022  | 0.006  | 0.003  | 0.002  |
| sec / 1M records            | 1.222  | 0.319  | 0.172  | 0.165  | 0.150  |

## Report
In analyzing the performance of the above PySpark pipeline we can see that as we progress from smaller ('S') to larger ('XXL') dataset sizes, there's a noticeable increase in execution time across both Parquet and Delta formats. This trend, while expected, is not entirely linear, illustrating how Spark's distributed processing capably handles larger datasets, albeit with increasing overhead. When comparing data formats, it's evident that each possesses unique efficiencies and costs. For instance, while Delta format may excel in handling updates and merges, it can incur greater overhead in other operations compared to Parquet, suggesting a trade-off based on the specific requirements of the task at hand.

Another crucial aspect is task complexity, particularly evident in the increased execution times observed when incorporating Task 1.2, which involves outlier removal using the modified Z-score method. This task adds computational complexity, necessitating additional data processing for statistical calculations, thereby extending the total processing time. Additionally, the 'sec / 1M records' metric serves as a useful indicator of the pipeline's scalability and efficiency, with lower values signifying better performance as data volume increases. 