# [CSC8101] Engineering for AI - 2024 Spark Coursework

Done by Raghavendra Reddy Andem - 230619823

## Coursework overview

### Inputs

- **NYC Taxi Trips dataset** - list of recorded taxi trips, each with several characteristics, namely: distance, number of passengers, origin zone, destination zone and trip cost (total amount charged to customer).
- **NYC Zones dataset** - list of zones wherein trips can originate/terminate.

### Tasks

1. Data cleaning
  1. Remove "0 distance" and 'no passengers' records.
  2. Remove outlier records. 
2. Add new columns
  1. Join with zones dataset
  2. Compute the unit profitability of each trip
3. Zone summarisation and ranking
  1. Summarise trip data per zone
  2. Obtain the top 10 ranks according to:
    1. The total trip volume
    2. Their average profitabilitiy
    3. The total passenger volume
4. Record the total and task-specific execution times for each dataset size and format.

### How to

###### Code structure and implementation

- You must implement your solution to each task in the provided function code skeleton.
- The task-specific functions are combined together to form the full pipeline code, executed last (do not modify this code).
- Before implementing the specified function skeleton, you should develop and test your solution on separate code cells (create and destroy cells as needed).

###### Development

- Develop an initial working solution for the 'S' dataset and only then optimise it for larger dataset sizes.
- To perform vectorised operations on a DataFrame:
  - use the API docs to look for existing vectorised functions in: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html
  - actions to get around the lazy execution of spark: 
  https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions
  - if a customised function is required (e.g. to add a new column based on a linear combination of other columns), implement your own User Defined Function (UDF). See:  https://spark.apache.org/docs/latest/sql-ref-functions-udf-scalar.html
- Use only the `pyspark.sql` API - documentation link below - (note that searching through the docs returns results from the `pyspark.sql` API together with the `pyspark.pandas` API):
  - https://spark.apache.org/docs/3.2.0/api/python/reference/pyspark.sql.html
- Periodically download your notebook to your computer as backup and safety measure against accidental file deletion.
 
###### Execution time measurement

- Execution time is calculated and returned by the Spark Engine and shown in the output region of the cell.
- To measure the execution time of a task you must perform a `collect` or similar operation (e.g. `take`) on the returned DataFrame.

## Task 0 - Read data

The code below is ready to run. **Do not modify this code**. It does the following:

- Reads the 'zones' dataset into variable 'zone_names'
- Defines the `init_trips` function that allows you to read the 'trips' dataset (from the DBFS FileStore) given the dataset size ('S' to 'XXL') and format ('parquet' or 'delta') as function arguments
- Defines the `pipeline` function, called in Task 4 to measure the execution time of the entire data processing pipeline
- Shows you how to call the `init_trips` function and display dataset characteristics (number of rows, schema)

In [0]:
## global imports
import pyspark.sql as ps
import pyspark.sql.functions as pf
import pandas as pd

# Load zone names dataset - (much faster to read small file from git than dbfs)
zones_file_url = 'https://raw.githubusercontent.com/mutazb999/CSC8101-lab-and-coursework/main/02-assignment-spark/taxi_zone_names.csv'
zone_names = spark.createDataFrame(pd.read_csv(zones_file_url))

# Function to load trips dataset by selected dataset size
def init_trips(size = 'S', data_format = "parquet", taxi_folder = "/FileStore/tables/taxi"):     
    
    files = {
        'S'  : ['2021_07'],
        'M'  : ['2021'],
        'L'  : ['2020_21'], 
        'XL' : ['1_6_2019', '7_12_2019'],
        'XXL': ['1_6_2019', '7_12_2019', '2020_21']
    }
    
    # validate input dataset size
    if size not in files.keys():
        print("Invalid input dataset size. Must be one of {}".format(list(files.keys())))
        return None               
    
    if data_format == "parquet":
        filenames = list(map(lambda s: f'{taxi_folder}/parquet/tripdata_{s}.parquet', files[size]))
        trips_df = spark.read.parquet(filenames[0])
        
        for name in filenames[1:]:
            trips_df = trips_df.union(spark.read.parquet(name))
            
    elif data_format == "delta":
        filenames = f"{taxi_folder}/delta/taxi-{size}-delta/"
        trips_df = spark.read.format("delta").load(filenames)
    
    else:
        print("Invalid data format. Must be one of {}".format(['parquet', 'delta']))
        return None
        
    print(
    """
    Trips dataset loaded!
    ---
      Size: {s}
      Format: {f}
      Tables loaded: {ds}
      Number of trips (dataset rows): {tc:,}
    """.format(s = size, f = data_format, ds = filenames, tc = trips_df.count()))
    
    return trips_df

# helper function to print dataset row count
def print_count(df):
    print("Row count: {t:,}".format(t = df.count()))

def pipeline(trips_df, with_task_12 = False, zones_df = zone_names):
    # Do not edit
    #---

    ## Task 1.1
    _trips_11 = t11_remove_zeros(trips_df)

    ## Task 1.2
    if with_task_12:
        _trips_12 = t12_remove_outliers(_trips_11)
    else:
        _trips_12 = _trips_11

    ## Task 2.1
    _trips_21 = t21_join_zones(_trips_12, zones_df = zone_names)

    ## Task 2.2
    _trips_22 = t22_calc_profit(_trips_21)

    ## Task 3.1
    _graph = t31_summarise_trips(_trips_22)

    ## Task 3.2
    _zones = t32_summarise_zones_pairs(_graph)

    _top10_trips     = t32_top10_trips(_zones)
    _top10_profit    = t32_top10_profit(_zones)
    _top10_passenger = t32_top10_passenger(_zones)
    
    return([_top10_trips, _top10_profit, _top10_passenger])

In [0]:
# CHANGE the value of argument 'size' to record the pipeline execution times for increasing dataset sizes
SIZE = 'S'
DATA_FORMAT = 'parquet'

# Load trips dataset
trips = init_trips(SIZE, DATA_FORMAT)

# uncomment line only for small datasets
# trips.take(1)


    Trips dataset loaded!
    ---
      Size: S
      Format: parquet
      Tables loaded: ['/FileStore/tables/taxi/parquet/tripdata_2021_07.parquet']
      Number of trips (dataset rows): 2,898,033
    


In [0]:
print_count(trips)

Row count: 2,898,033


In [0]:
# dataset schemas
trips.printSchema()

root
 |-- index: long (nullable = true)
 |-- VendorID: double (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- cab_type: string (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |--

In [0]:
display(trips[['PULocationID', 'DOLocationID', 'trip_distance', 'passenger_count', 'total_amount']].take(5))

PULocationID,DOLocationID,trip_distance,passenger_count,total_amount
90,68,0.8,1.0,8.8
113,90,0.9,1.0,8.8
88,232,2.8,1.0,13.8
79,249,1.4,1.0,12.3
142,238,2.0,0.0,12.3


In [0]:
zone_names.printSchema()

root
 |-- LocationID: long (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [0]:
display(zone_names.take(5))

LocationID,Borough,Zone,service_zone
1,EWR,Newark Airport,EWR
2,Queens,Jamaica Bay,Boro Zone
3,Bronx,Allerton/Pelham Gardens,Boro Zone
4,Manhattan,Alphabet City,Yellow Zone
5,Staten Island,Arden Heights,Boro Zone


## Task 1 - Filter rows

**Input:** trips dataset

### Task 1.1 - Remove "0 distance" and 'no passengers' records

Remove dataset rows that represent invalid trips:

- Trips where `trip_distance == 0` (no distance travelled)
- Trips where `passenger_count == 0` and `total_amount == 0` (we want to retain records where `total_amount` > 0 - these may be significant as the taxi may have carried some parcel, for example)

Altogether, a record is removed if it satisfies the following conditions:

`trip_distance == 0` or `(passenger_count == 0` and `total_amount == 0)`.

**Recommended:** Select only the relevant dataset columns for this and subsequent tasks: `['PULocationID', 'DOLocationID', 'trip_distance', 'passenger_count', 'total_amount')]`

### Task 1.2 - Remove outliers using the modified z-score

Despite having removed spurious "zero passengers" trips in task 1.1, columns `total_amount` and `trip_distance` contain additional outlier values that must be identified and removed.

To identify and remove outliers, you will use the modified [z-score](https://en.wikipedia.org/wiki/Standard_score) method.
The modified z-score uses the median and [Median Absolute Deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) (MAD), instead of the mean and standard deviation, to determine how far an observation (indexed by i) is from the mean:

$$z_i = \frac{x_i - \mathit{median}(\mathbf{x})}{\mathbf{MAD}},$$

where x represents the input vector, xi is an element of x and zi is its corresponding z-score. In turn, the MAD formula is:

$$\mathbf{MAD} = 1.483 * \mathit{median}(\big\lvert x_i - \mathit{median}(\mathbf{x})\big\rvert).$$

Observations with **high** (absolute) z-score are considered outlier observations. A score is considered **high** if its __absolute z-score__ is larger than a threshold T = 3.5:

$$\big\lvert z_i \big\rvert > 3.5.$$

where T represents the number of unit standard deviations beyond which a score is considered an outlier ([wiki](https://en.wikipedia.org/wiki/68%E2%80%9395%E2%80%9399.7_rule)).

This process is repeated twice, once for each of the columns `total_amount` and `trip_distance` (in any order).

**Important:** Use the surrogate function [`percentile_approx`](https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.functions.percentile_approx.html?highlight=percentile#pyspark.sql.functions.percentile_approx) to estimate the median (calculating the median values for a column is expensive as it cannot be parallelised efficiently).

In [0]:
# develop your solution here (create/destroy cells as needed) and then implement it in the functions below

In [0]:
import pyspark.sql.functions as F

def t11_remove_zeros(df):
    # Selecting specific columns from the DataFrame
    df = df.select("PULocationID", "DOLocationID", "trip_distance", "passenger_count", "total_amount")
    # Filtering out rows where trip_distance is zero or both passenger_count and total_amount are zero
    df_new = df.filter(~((df.trip_distance == 0) | (df.passenger_count == 0) & (df.total_amount == 0)))
    
    return df_new


We first select only the columns "PULocationID", "DOLocationID", "trip_distance", "passenger_count", and "total_amount" from the DataFrame df. This reduces the DataFrame to only the columns we're interested in.

Then, we filter out rows where either trip_distance is zero or both passenger_count and total_amount are zero. We use the filter method along with logical conditions to achieve this. The ~ symbol negates the condition, so it filters out rows where the condition is not true.

In [0]:
# execute task 1.1
trips_11 = t11_remove_zeros(trips)
#Print the count of rows in the resulting DataFrame trips_11
print_count(trips_11)

## uncomment only for smaller datasets
display(trips_11.take(10))

Row count: 2,858,164


PULocationID,DOLocationID,trip_distance,passenger_count,total_amount
90,68,0.8,1.0,8.8
113,90,0.9,1.0,8.8
88,232,2.8,1.0,13.8
79,249,1.4,1.0,12.3
142,238,2.0,0.0,12.3
114,90,1.6,1.0,12.8
90,144,1.8,1.0,13.3
114,48,2.0,1.0,14.75
48,152,5.7,1.0,22.3
234,148,1.8,1.0,14.75


In [0]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col

def t12_remove_outliers(df_new):

    # Calculate the median and MAD for total_amount
    total_amount_median = df_new.agg(F.expr("percentile_approx(total_amount, 0.5)").alias("median_total_amount")).collect()[0]["median_total_amount"]

    # Calculate absolute deviations from the median
    df_new = df_new.withColumn ("total_amount_xi", F.abs(df_new.total_amount - total_amount_median)) 

    # Calculate MAD (Median Absolute Deviation)
    total_amount_mad = 1.483 * df_new.agg(F.expr("percentile_approx(total_amount_xi, 0.5)").alias('total_amount_mad')).collect()[0]["total_amount_mad"]

    # Calculate z-score for total_amount
    df_new = df_new.withColumn ("z_score_total_amount", F.abs(col("total_amount") - total_amount_median) / total_amount_mad)


    # Calculate the median and MAD for trip_distance
    trip_distance_median = df_new.agg(F.expr("percentile_approx(trip_distance, 0.5)").alias("median_trip_distance")).collect()[0]["median_trip_distance"]

    # Calculate absolute deviations from the median
    df_new = df_new.withColumn("trip_distance_xi", F.abs(df_new.trip_distance - trip_distance_median))

    # Calculate MAD (Median Absolute Deviation)
    trip_distance_mad = 1.483 * df_new.agg(F.expr("percentile_approx(trip_distance_xi, 0.5)").alias('trip_distance_mad')).collect()[0]["trip_distance_mad"]

     # Calculate z-score for trip_distance
    df_new = df_new.withColumn("z_score_trip_distance", F.abs(col("trip_distance") - trip_distance_median) / trip_distance_mad)

    # Remove outliers
    df_new = df_new.filter(~(df_new.z_score_total_amount > 3.5)|(df_new.z_score_trip_distance > 3.5) )

    
    # Print median and MAD values for reference
    print("Median value for Total amount:",total_amount_median)
    print("MAD value for Total amount:",total_amount_mad)
    print("MAD value for Trip distance:",trip_distance_mad)
    print("Median value for Trip distance:",trip_distance_median)

    return df_new






In [0]:
# execute task 1.2
trips_12 = t12_remove_outliers(trips_11)

print_count(trips_12)
display(trips_12.take(10))

Median value for Total amount: 15.3
MAD value for Total amount: 6.317579999999998
MAD value for Trip distance: 1.4088500000000004
Median value for Trip distance: 1.9
Row count: 2,836,613


PULocationID,DOLocationID,trip_distance,passenger_count,total_amount,total_amount_xi,z_score_total_amount,trip_distance_xi,z_score_trip_distance
90,68,0.8,1.0,8.8,6.5,1.0288749806096642,1.1,0.7807786492529365
113,90,0.9,1.0,8.8,6.5,1.0288749806096642,1.0,0.7097987720481241
88,232,2.8,1.0,13.8,1.5,0.2374326878329994,0.8999999999999999,0.6388188948433117
79,249,1.4,1.0,12.3,3.0,0.4748653756659988,0.5,0.354899386024062
142,238,2.0,0.0,12.3,3.0,0.4748653756659988,0.1,0.0709798772048124
114,90,1.6,1.0,12.8,2.5,0.3957211463883324,0.2999999999999998,0.2129396316144371
90,144,1.8,1.0,13.3,2.0,0.3165769171106659,0.0999999999999998,0.0709798772048123
114,48,2.0,1.0,14.75,0.5500000000000007,0.0870586522054332,0.1,0.0709798772048124
48,152,5.7,1.0,22.3,7.0,1.1080192098873307,3.8,2.697235333782872
234,148,1.8,1.0,14.75,0.5500000000000007,0.0870586522054332,0.0999999999999998,0.0709798772048123


## Task 2 - Compute new columns

### Task 2.1 - Zone names

Obtain the **start** and **end** zone names of each trip by joining the `trips` and `zone_names` datasets (i.e. by using the `zone_names` dataset as lookup table).

**Note:** The columns containing the start and end zone ids of each trip are named `PULocationID` and `DOLocationID`, respectively.

### Task 2.2 - Unit profitability

Compute the column `unit_profitability = total_amount / trip_distance`.

In [0]:
# develop your solution here (create/destroy cells as needed) and then implement it in the functions below

In [0]:
# Your solution implementation to task 2.1 goes HERE
def t21_join_zones(df_new, zones_df = zone_names):
    # input: output of task 1.2 and zone_names dataset
    trips_with_start_zones = df_new.join(zones_df, df_new['PULocationID'] == zones_df['LocationID'],how = 'left').select(df_new['*'],zones_df['Borough'].alias('start_Borough'), zones_df['Zone'].alias('start_Zone'), zones_df['service_zone'].alias('start_service_zone'))

    trips_with_end_zones = df_new.join(zones_df, df_new['DOLocationID'] == zones_df['LocationID'],how = 'left').select(df_new['*'],zones_df['Borough'].alias('end_Borough'), zones_df['Zone'].alias('end_Zone'), zones_df['service_zone'].alias('end_service_zone'))
    
    return trips_with_start_zones , trips_with_end_zones

In [0]:
from pyspark.sql.functions import expr, col
def t21_join_zones(df_new, zones_df = zone_names):
    # Joining trips_df with zone_names_df to get start zone names
    trips_with_start_zones = zones_df.withColumnRenamed('LocationID' , 'PULocationID')\
                                .withColumnRenamed('Zone' , 'start_zone')
    
    # Joining trips_df with zone_names_df to get end zone names
    trips_with_end_zones = zones_df.withColumnRenamed('LocationID' , 'DOLocationID')\
                                .withColumnRenamed('Zone' , 'end_zone')
    
    # Adding end zone columns to trips_with_start_zones
    df_new = df_new.join(trips_with_start_zones , ['PULocationID'] , 'left')\
                    .join(trips_with_end_zones, ['DOLocationID'] , 'left' )

    
    return df_new


In [0]:
# execute task 2.1
trips_21 = t21_join_zones(trips_12, zones_df = zone_names)

print_count(trips_21)
display(trips_21.take(10))

Row count: 2,836,613


DOLocationID,PULocationID,trip_distance,passenger_count,total_amount,total_amount_xi,z_score_total_amount,trip_distance_xi,z_score_trip_distance,Borough,start_zone,service_zone,Borough.1,end_zone,service_zone.1
68,90,0.8,1.0,8.8,6.5,1.0288749806096642,1.1,0.7807786492529365,Manhattan,Flatiron,Yellow Zone,Manhattan,East Chelsea,Yellow Zone
90,113,0.9,1.0,8.8,6.5,1.0288749806096642,1.0,0.7097987720481241,Manhattan,Greenwich Village North,Yellow Zone,Manhattan,Flatiron,Yellow Zone
232,88,2.8,1.0,13.8,1.5,0.2374326878329994,0.8999999999999999,0.6388188948433117,Manhattan,Financial District South,Yellow Zone,Manhattan,Two Bridges/Seward Park,Yellow Zone
249,79,1.4,1.0,12.3,3.0,0.4748653756659988,0.5,0.354899386024062,Manhattan,East Village,Yellow Zone,Manhattan,West Village,Yellow Zone
238,142,2.0,0.0,12.3,3.0,0.4748653756659988,0.1,0.0709798772048124,Manhattan,Lincoln Square East,Yellow Zone,Manhattan,Upper West Side North,Yellow Zone
90,114,1.6,1.0,12.8,2.5,0.3957211463883324,0.2999999999999998,0.2129396316144371,Manhattan,Greenwich Village South,Yellow Zone,Manhattan,Flatiron,Yellow Zone
144,90,1.8,1.0,13.3,2.0,0.3165769171106659,0.0999999999999998,0.0709798772048123,Manhattan,Flatiron,Yellow Zone,Manhattan,Little Italy/NoLiTa,Yellow Zone
48,114,2.0,1.0,14.75,0.5500000000000007,0.0870586522054332,0.1,0.0709798772048124,Manhattan,Greenwich Village South,Yellow Zone,Manhattan,Clinton East,Yellow Zone
152,48,5.7,1.0,22.3,7.0,1.1080192098873307,3.8,2.697235333782872,Manhattan,Clinton East,Yellow Zone,Manhattan,Manhattanville,Boro Zone
148,234,1.8,1.0,14.75,0.5500000000000007,0.0870586522054332,0.0999999999999998,0.0709798772048123,Manhattan,Union Sq,Yellow Zone,Manhattan,Lower East Side,Yellow Zone


In [0]:
def t22_calc_profit(df_new):
    """
    Task 2.2: Compute the column unit_profitability = total_amount / trip_distance.
    """
    # Compute unit profitability and add it to the DataFrame
    df_new = df_new.withColumn("unit_profitability", df_new["total_amount"] / df_new["trip_distance"])
    
    return df_new



In [0]:
# execute task 2.2
trips_22 = t22_calc_profit(trips_21)

print_count(trips_22)
display(trips_22.take(100))

Row count: 2,836,613


DOLocationID,PULocationID,trip_distance,passenger_count,total_amount,total_amount_xi,z_score_total_amount,trip_distance_xi,z_score_trip_distance,Borough,start_zone,service_zone,Borough.1,end_zone,service_zone.1,unit_profitability
68,90,0.8,1.0,8.8,6.5,1.0288749806096642,1.1,0.7807786492529365,Manhattan,Flatiron,Yellow Zone,Manhattan,East Chelsea,Yellow Zone,11.0
90,113,0.9,1.0,8.8,6.5,1.0288749806096642,1.0,0.7097987720481241,Manhattan,Greenwich Village North,Yellow Zone,Manhattan,Flatiron,Yellow Zone,9.77777777777778
232,88,2.8,1.0,13.8,1.5,0.2374326878329994,0.8999999999999999,0.6388188948433117,Manhattan,Financial District South,Yellow Zone,Manhattan,Two Bridges/Seward Park,Yellow Zone,4.928571428571429
249,79,1.4,1.0,12.3,3.0,0.4748653756659988,0.5,0.354899386024062,Manhattan,East Village,Yellow Zone,Manhattan,West Village,Yellow Zone,8.785714285714286
238,142,2.0,0.0,12.3,3.0,0.4748653756659988,0.1,0.0709798772048124,Manhattan,Lincoln Square East,Yellow Zone,Manhattan,Upper West Side North,Yellow Zone,6.15
90,114,1.6,1.0,12.8,2.5,0.3957211463883324,0.2999999999999998,0.2129396316144371,Manhattan,Greenwich Village South,Yellow Zone,Manhattan,Flatiron,Yellow Zone,8.0
144,90,1.8,1.0,13.3,2.0,0.3165769171106659,0.0999999999999998,0.0709798772048123,Manhattan,Flatiron,Yellow Zone,Manhattan,Little Italy/NoLiTa,Yellow Zone,7.388888888888889
48,114,2.0,1.0,14.75,0.5500000000000007,0.0870586522054332,0.1,0.0709798772048124,Manhattan,Greenwich Village South,Yellow Zone,Manhattan,Clinton East,Yellow Zone,7.375
152,48,5.7,1.0,22.3,7.0,1.1080192098873307,3.8,2.697235333782872,Manhattan,Clinton East,Yellow Zone,Manhattan,Manhattanville,Boro Zone,3.912280701754386
148,234,1.8,1.0,14.75,0.5500000000000007,0.0870586522054332,0.0999999999999998,0.0709798772048123,Manhattan,Union Sq,Yellow Zone,Manhattan,Lower East Side,Yellow Zone,8.194444444444445


## Task 3: Rank zones by traffic, passenger volume and profitability

### 3.1 - Summarise interzonal travel

Build a graph data structure of zone-to-zone traffic, representing aggregated data about trips between any two zones. The graph will have one node for each zone and one edge connecting each pair of zones. In addition, edges contain aggregate information about all trips between those zones. 

For example, zones Z1 and Z2 are connected by *two* edges: edge Z1 --> Z2 carries aggregate data about all trips that originated in Z1 and ended in Z2, and edge Z2 --> Z1 carries aggregate data about all trips that originated in Z2 and ended in Z1.

The aggregate information of interzonal travel must include the following data:

- `average_unit_profit` - the average unit profitability (calculated as `mean(unit_profitability)`).
- `trips_count` -- the total number of recorded trips.
- `total_passengers` -- the total number of passenger across all trips (sum of `passenger_count`).

This graph can be represented as a new dataframe, with schema:

\[`PULocationID`, `DOLocationID`, `average_unit_profit`, `trips_count`, `total_passengers` \]

__hint__: the `groupby()` operator produces a `pyspark.sql.GroupedData` structure. You can then calculate multiple aggregations from this using `pyspark.sql.GroupedData.agg()`: 
- https://spark.apache.org/docs/3.2.0/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.groupby.html
- https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.GroupedData.agg.html

### Task 3.2 - Obtain top-10 zones

For each of the following measures, report the top-10 zones _using their plain names you dereferenced in the previous step, not the codes_. Note that this requires ranking the nodes in different orders. Specifically, you need to calculate the following further aggregations:

- the **total** number of trips originating from Z. This is simply the sum of `trips_count` over all outgoing edges for Z, i.e., edges of the form Z -> \*
- the **average** profitability of a zone. This is the average of all `average_unit_profit` over all *outgoing* edges from Z.
- The **total** passenger volume measured as the **sum** of `total_passengers` carried in trips that originate from Z

In [0]:
# develop your solution here (create/destroy cells as needed) and then implement it in the functions below

In [0]:
## Your solution to task 3.1 goes HERE
from pyspark.sql.functions import mean, sum , count
def t31_summarise_trips(df_new):
    # input: output of task 2.2
    grouped_data = df_new.groupby("PULocationID", "DOLocationID")
    # Aggregate data to calculate average unit profitability, trips count, and total passengers
    aggregated_data = grouped_data.agg(
    mean("unit_profitability").alias("average_unit_profit"),
    sum("passenger_count").alias("total_passengers"),
    count("*"). alias("trips_count")
)

# Create a new DataFrame with the aggregated information
    df_new = aggregated_data.select(
    "PULocationID",
    "DOLocationID",
    "average_unit_profit",
    "trips_count",
    "total_passengers"
)

# Show the resulting DataFrame
    df_new.show()

    return df_new

In [0]:
# execute task 3.1
graph = t31_summarise_trips(trips_22)

print_count(graph)
display(graph.take(10))

+------------+------------+-------------------+-----------+----------------+
|PULocationID|DOLocationID|average_unit_profit|trips_count|total_passengers|
+------------+------------+-------------------+-----------+----------------+
|          97|          29| 3.3008819056842604|          3|             3.0|
|         161|          29| 3.4599615427882955|          6|             6.0|
|         132|          26| 2.9664356587202683|        253|           412.0|
|         132|          29| 3.1044577420063386|        168|           243.0|
|         137|          26|  4.132075380450283|         27|            37.0|
|         213|          29| 2.9749504372386073|          2|             3.0|
|         228|          26|   7.41471058428216|          8|             2.0|
|         108|          26|  4.812733426704014|          2|             2.0|
|         236|          26| 3.8505210918536403|         11|            17.0|
|          79|          29|  3.275157759773647|          9|            11.0|

PULocationID,DOLocationID,average_unit_profit,trips_count,total_passengers
97,29,3.3008819056842604,3,3.0
161,29,3.4599615427882955,6,6.0
132,26,2.9664356587202683,253,412.0
132,29,3.1044577420063386,168,243.0
137,26,4.132075380450283,27,37.0
213,29,2.9749504372386077,2,3.0
228,26,7.41471058428216,8,2.0
108,26,4.812733426704014,2,2.0
236,26,3.85052109185364,11,17.0
79,29,3.275157759773647,9,11.0


In [0]:
# Your solution to task 3.2 goes HERE (implement each of the functions below)
from pyspark.sql.functions import desc
import time

def t32_summarise_zones_pairs(df_new, zones_df = zone_names):
    df_with_names = df_new.join(zones_df, df_new.PULocationID == zones_df.LocationID, "left") \
                      .withColumnRenamed("ZoneName", "PULocationName") \
                      .drop("LocationID") \
                      .join(zones_df, df_new.DOLocationID == zones_df.LocationID, "left") \
                      .withColumnRenamed("ZoneName", "DOLocationName") \
                      .drop("LocationID")
    return df_with_names

# Top 10 ranked zones by traffic (trip volume)
def t32_top10_trips(df_zones):
    top_10_trips = df_zones.groupBy("PULocationID") \
                           .agg({"trips_count": "sum"}) \
                           .withColumnRenamed("sum(trips_count)", "total_trips_count") \
                           .orderBy(desc("total_trips_count")) \
                           .limit(10)
    return top_10_trips

# Top 10 ranked zones by profit
def t32_top10_profit(df_zones):
    top_10_profit = df_zones.groupBy("PULocationID") \
                             .agg({"average_unit_profit": "avg"}) \
                             .withColumnRenamed("avg(average_unit_profit)", "average_profitability") \
                             .orderBy(desc("average_profitability")) \
                             .limit(10)
    return top_10_profit

# Top 10 ranked zones by passenger volume
def t32_top10_passenger(df_zones):
    top_10_passenger = df_zones.groupBy("PULocationID") \
                                .agg({"total_passengers": "sum"}) \
                                .withColumnRenamed("sum(total_passengers)", "total_passenger_volume") \
                                .orderBy(desc("total_passenger_volume")) \
                                .limit(10)
    return top_10_passenger


In [0]:
# execute task 3.2
zones = t32_summarise_zones_pairs(graph)

top10_trips     = t32_top10_trips(zones)
top10_profit    = t32_top10_profit(zones)
top10_passenger = t32_top10_passenger(zones)

In [0]:
# use 'display()' or return a pandas DataFrame for 'pretty' output
display(top10_trips)

PULocationID,total_trips_count
237,126262
236,108539
132,106127
186,102196
161,98102
170,93217
162,89669
142,84331
48,80006
239,78287


In [0]:
# use 'display()' return a pandas DataFrame for 'pretty' output
display(top10_profit)

PULocationID,average_profitability
190,62.27038689066288
15,24.136390098078515
241,20.85511049829426
59,20.23633830399885
192,18.03549013392413
207,17.256668502562587
10,13.870876308735046
128,13.439850756388974
81,11.21790620709023
168,10.91970173337134


In [0]:
# use 'display()' or return a pandas DataFrame for 'pretty' output
display(top10_passenger)

PULocationID,total_passenger_volume
237,175915.0
132,165301.0
236,150392.0
186,144805.0
161,141128.0
170,129657.0
162,125970.0
142,119147.0
48,114519.0
79,110808.0


## Task 4 - Record the pipeline's execution time

Record the execution time of:

1. the whole pipeline
2. the whole pipeline except task 1.2

on the two tables below, for all dataset sizes: `'S'`, `'M'`, `'L'`, `'XL'`, `'XXL'`, and data formats: `parquet` and `delta`.

Analyse the resulting execution times and comment on the effect of dataset size, dataset format and task complexity (with and without task 1.2) on pipeline performance.

In [0]:
# CHANGE the value of the following arguments to record the pipeline execution times for increasing dataset sizes
SIZE = 'S'
DATA_FORMAT = 'delta'
WITH_TASK_12 = True

# Load trips dataset
trips = init_trips(SIZE, DATA_FORMAT)


    Trips dataset loaded!
    ---
      Size: S
      Format: delta
      Tables loaded: /FileStore/tables/taxi/delta/taxi-S-delta/
      Number of trips (dataset rows): 2,898,033
    


In [0]:
# run and record the resulting execution time shown by databricks (on the cell footer)

# IMPORTANT: this function calls all task functions in order of occurrence. For this code to run without errors, you have to load into memory all of the previous task-specific functions, even if you haven't implemented these yet.
pipeline(trips, with_task_12 = WITH_TASK_12)

Median value for Total amount: 15.3
MAD value for Total amount: 6.317579999999998
MAD value for Trip distance: 1.4088500000000004
Median value for Trip distance: 1.9
+------------+------------+-------------------+-----------+----------------+
|PULocationID|DOLocationID|average_unit_profit|trips_count|total_passengers|
+------------+------------+-------------------+-----------+----------------+
|          97|          29| 3.3008819056842604|          3|             3.0|
|         161|          29| 3.4599615427882955|          6|             6.0|
|         132|          26| 2.9664356587202683|        253|           412.0|
|         132|          29| 3.1044577420063386|        168|           243.0|
|         137|          26|  4.132075380450283|         27|            37.0|
|         213|          29| 2.9749504372386073|          2|             3.0|
|         228|          26|   7.41471058428216|          8|             2.0|
|         108|          26|  4.812733426704014|          2|     

[DataFrame[PULocationID: bigint, total_trips_count: bigint],
 DataFrame[PULocationID: bigint, average_profitability: double],
 DataFrame[PULocationID: bigint, total_passenger_volume: double]]

_Table 1. Pipeline performance for `parquet` format._

| metric                      | S    | M    | L    | XL   | XXL  |
|-----------------------------|------|------|------|------|------|
| rows (M)                    |2,898,033|15,571,166|41,953,716|90,443,069|132,396,785|
| execution time   (w/o 1.2)  |2.61s|4.43s|7.14s|8.37s|11.40s|
| execution time              |8.70s|20.05s|23.91s|2.43 Min|3.23 Min|
| sec / 1M records (w/o 1.2)  |0.000901|0.000284|0.000170|0.000093|0.000086|
| sec / 1M records            |0.0030|0.0013|0.0006|0.0007|0.0008|

_Table 2. Pipeline performance for `delta` format._

| metric                      | S    | M    | L    | XL   | XXL  |
|-----------------------------|------|------|------|------|------|
| rows (M)                    |2,898,033|15,571,166|41,953,716|90,443,069|132,396,785|
| execution time   (w/o 1.2)  |1.31s|1.80s|2.67s|3.54s|6.74s|
| execution time              |2.27s|5.83s|15.46s|15.81s|21.16s|
| sec / 1M records (w/o 1.2)  |0.000452  |0.000115  |0.000064  |0.000039  |0.000051  |
| sec / 1M records            |0.000783  |0.000374  |0.000369  |0.000175  |0.000160  |

The execution time of the pipeline increases notably as the dataset size grows from small (S) to extra-extra-large (XXL).This exponential increase in execution time suggests that the pipeline's performance is significantly impacted by the size of the dataset.

Examining the "sec / 1M records" metrics reveals interesting insights into the efficiency of the pipeline. Despite the increase in execution time with dataset size, the time taken per million records decreases. This implies that although larger datasets require more processing time overall, the processing time per individual record decreases as the dataset size increases. This phenomenon suggests potential optimizations or scalability features within the pipeline that enhance efficiency as the dataset size grows.