# NYC Taxi Trips Big Data Analytics - Spark

## Overview

### Inputs

- **NYC Taxi Trips dataset** - list of recorded taxi trips, each with several characteristics, namely: distance, number of passengers, origin zone, destination zone and trip cost (total amount charged to customer).
- **NYC Zones dataset** - list of zones wherein trips can originate/terminate.

### Steps

1. Data cleaning:
  <br>1. Remove "0 distance" and 'no passengers' records.
  <br>2. Remove outlier records. 
2. Add new columns:
  <br>1. Join taxi trips dataset with zones dataset
  <br>2. Compute the unit profitability of each trip
3. Zone summarisation and ranking:
  <br>1. Summarise trip data per zone
  <br>2. Obtain the top 10 ranks according to:
      1.) The total trip volume
      2.) Their average profitabilitiy
      3.) The total passenger volume
4. Record the total and task-specific execution times for each dataset size and format.
 
### Execution time measurement

- Execution time is calculated and returned by the Spark Engine and shown in the output region of the cell.
- To measure the execution time of a task we perform a `collect` or similar operation (e.g. `take`) on the returned DataFrame.

## Task 0 - Read data

The code below is ready to run. **Do not modify this code**. It does the following:

- Reads the 'zones' dataset into variable 'zone_names'
- Defines the `init_trips` function that allows you to read the 'trips' dataset (from the DBFS FileStore) given the dataset size ('S' to 'XXL') and format ('parquet' or 'delta') as function arguments
- Defines the `pipeline` function, called in Task 4 to measure the execution time of the entire data processing pipeline
- Shows you how to call the `init_trips` function and display dataset characteristics (number of rows, schema)

In [None]:
## global imports
import pyspark.sql as ps
import pyspark.sql.functions as pf
import pandas as pd

# Load zone names dataset - (much faster to read small file from git than dbfs)
zones_file_url = 'https://raw.githubusercontent.com/AuliaAmirullah15/NYC_Taxi_Trips_Analytics/main/taxi_zone_names.csv'
zone_names = spark.createDataFrame(pd.read_csv(zones_file_url))

# Function to load trips dataset by selected dataset size
def init_trips(size = 'S', data_format = "parquet", taxi_folder = "/FileStore/tables/taxi"):     
    
    files = {
        'S'  : ['2021_07'],
        'M'  : ['2021'],
        'L'  : ['2020_21'],
        'XL' : ['1_6_2019', '7_12_2019'],
        'XXL': ['1_6_2019', '7_12_2019', '2020_21']
    }
    
    # validate input dataset size
    if size not in files.keys():
        print("Invalid input dataset size. Must be one of {}".format(list(files.keys())))
        return None               
    
    if data_format == "parquet":
        filenames = list(map(lambda s: f'{taxi_folder}/parquet/tripdata_{s}.parquet', files[size]))
        trips_df = spark.read.parquet(filenames[0])
        
        for name in filenames[1:]:
            trips_df = trips_df.union(spark.read.parquet(name))
            
    elif data_format == "delta":
        filenames = f"{taxi_folder}/delta/taxi-{size}-delta/"
        trips_df = spark.read.format("delta").load(filenames)
    
    else:
        print("Invalid data format. Must be one of {}".format(['parquet', 'delta']))
        return None
        
    print(
    """
    Trips dataset loaded!
    ---
      Size: {s}
      Format: {f}
      Tables loaded: {ds}
      Number of trips (dataset rows): {tc:,}
    """.format(s = size, f = data_format, ds = filenames, tc = trips_df.count()))
    
    return trips_df

# helper function to print dataset row count
def print_count(df):
    print("Row count: {t:,}".format(t = df.count()))

def pipeline(trips_df, with_step_12 = False, zones_df = zone_names):

    ## Step 1.1
    _trips_11 = t11_remove_zeros(trips_df)

    ## Step 1.2
    if with_step_12:
        _trips_12 = t12_remove_outliers(_trips_11)
    else:
        _trips_12 = _trips_11

    ## Step 2.1
    _trips_21 = t21_join_zones(_trips_12, zones_df = zone_names)

    ## Step 2.2
    _trips_22 = t22_calc_profit(_trips_21)

    ## Step 3.1
    _graph = t31_summarise_trips(_trips_22)

    ## Step 3.2
    _zones = t32_summarise_zones_pairs(_graph)

    _top10_trips     = t32_top10_trips(_zones)
    _top10_profit    = t32_top10_profit(_zones)
    _top10_passenger = t32_top10_passenger(_zones)
    
    return([_top10_trips, _top10_profit, _top10_passenger])

In [None]:
# CHANGE the value of argument 'size' to record the pipeline execution times for increasing dataset sizes
SIZE = 'S'
DATA_FORMAT = 'parquet'

# Load trips dataset
trips = init_trips(SIZE, DATA_FORMAT)

# uncomment line only for small datasets
# trips.take(1)


    Trips dataset loaded!
    ---
      Size: S
      Format: parquet
      Tables loaded: ['/FileStore/tables/taxi/parquet/tripdata_2021_07.parquet']
      Number of trips (dataset rows): 2,898,033
    


In [None]:
print_count(trips)

Row count: 2,898,033


In [None]:
# dataset schemas
trips.printSchema()

root
 |-- index: long (nullable = true)
 |-- VendorID: double (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- cab_type: string (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |--

In [None]:
display(trips[['PULocationID', 'DOLocationID', 'trip_distance', 'passenger_count', 'total_amount']].take(5))

PULocationID,DOLocationID,trip_distance,passenger_count,total_amount
90,68,0.8,1.0,8.8
113,90,0.9,1.0,8.8
88,232,2.8,1.0,13.8
79,249,1.4,1.0,12.3
142,238,2.0,0.0,12.3


In [None]:
zone_names.printSchema()

root
 |-- LocationID: long (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [None]:
display(zone_names.take(5))

LocationID,Borough,Zone,service_zone
1,EWR,Newark Airport,EWR
2,Queens,Jamaica Bay,Boro Zone
3,Bronx,Allerton/Pelham Gardens,Boro Zone
4,Manhattan,Alphabet City,Yellow Zone
5,Staten Island,Arden Heights,Boro Zone


## Task 1 - Filter rows

**Input:** trips dataset

### Task 1.1 - Remove "0 distance" and 'no passengers' records

Remove dataset rows that contains invalid trips:

- Trips where `trip_distance == 0` (no distance travelled)
- Trips where `passenger_count == 0` and `total_amount == 0` (we retain records where `total_amount` > 0 - the taxi may have carried some parcel, etc)

Altogether, a record is removed if it satisfies the following conditions:

`trip_distance == 0` or `(passenger_count == 0` and `total_amount == 0)`.

**Recommended:** Select only the relevant dataset columns that supports our pipeline: `['PULocationID', 'DOLocationID', 'trip_distance', 'passenger_count', 'total_amount')]`

### Task 1.2 - Remove outliers using the modified z-score

Even though we have removed "zero passengers" trips in Step 1.1, columns `total_amount` and `trip_distance` still contain additional outlier values that must be identified and removed.

To identify and remove outliers, we need to use the modified [z-score](https://en.wikipedia.org/wiki/Standard_score) method.
The modified z-score uses the median and [Median Absolute Deviation](https://en.wikipedia.org/wiki/Median_absolute_deviation) (MAD), instead of the mean and standard deviation, to determine how far an observation (indexed by i) is from the mean:

$$z_i = \frac{x_i - \mathit{median}(\mathbf{x})}{\mathbf{MAD}},$$

where x represents the input vector, xi is an element of x and zi is its corresponding z-score. In turn, the MAD formula is:

$$\mathbf{MAD} = 1.483 * \mathit{median}(\big\lvert x_i - \mathit{median}(\mathbf{x})\big\rvert).$$

Observations with **high** (absolute) z-score are considered outlier observations. A score is considered **high** if its __absolute z-score__ is larger than a threshold T = 3.5:

$$\big\lvert z_i \big\rvert > 3.5.$$

where T represents the number of unit standard deviations beyond which a score is considered an outlier ([wiki](https://en.wikipedia.org/wiki/68%E2%80%9395%E2%80%9399.7_rule)).

This process is repeated twice, once for each of the columns `total_amount` and `trip_distance` (in any order).

**Important:** Use the surrogate function [`percentile_approx`](https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.functions.percentile_approx.html?highlight=percentile#pyspark.sql.functions.percentile_approx) to estimate the median (calculating the median values for a column is expensive as it cannot be parallelised efficiently).

In [None]:
# STEP 1.1 REMOVE ZEROS
def t11_remove_zeros(df):
    # input: trips dataset
    df = df.filter(df.trip_distance > 0) # REMOVE 0 distance
    df = df.filter((df.passenger_count > 0) & (df.total_amount > 0)) # RETAIN records where passengers and total amount are more than 0
    df = df.select(['PULocationID', 'DOLocationID', 'trip_distance', 'passenger_count', 'total_amount']) # ONLY SELECT these 5 fields
    return df

In [None]:
# execute step 1.1
trips_11 = t11_remove_zeros(trips)

print_count(trips_11)

## uncomment only for smaller datasets
display(trips_11.take(10))

Row count: 2,620,121


PULocationID,DOLocationID,trip_distance,passenger_count,total_amount
90,68,0.8,1.0,8.8
113,90,0.9,1.0,8.8
88,232,2.8,1.0,13.8
79,249,1.4,1.0,12.3
114,90,1.6,1.0,12.8
90,144,1.8,1.0,13.3
114,48,2.0,1.0,14.75
48,152,5.7,1.0,22.3
234,148,1.8,1.0,14.75
79,141,3.5,2.0,17.3


In [None]:
# STEP 1.2 REMOVE OUTLIERS
def t12_remove_outliers(df):
    med = df.select(pf.percentile_approx("total_amount", 0.5).alias("total_amount_med")) # Median Value of The Total Amount
    taMed = med.select(pf.max('total_amount_med')).first()[0] # Assign total_amount_med to a variable
    
    df = df.withColumn("total_amount - total_amount_MED", pf.abs(pf.col("total_amount")-taMed)) # Add a new column for total amount - taMed
    med = df.select(pf.percentile_approx("total_amount - total_amount_MED", 0.5).alias("NewtaMed")) # Median value of ta - taMed
    NewtaMed = med.select(pf.max('NewtaMed')).first()[0] # Reassign variable med with a new value
    taMad = NewtaMed*1.483 # Working out taMad and setting it as a variable
    
    df = df.withColumn("Abs Z Score Total Amount", pf.abs((pf.col("total_amount")-taMed)/taMad)) # Create new column for Abs Z score
    
    med = df.select(pf.percentile_approx("trip_distance", 0.5).alias("trip_distance_med")) # Median Value of the trip distance
    tdMed = med.select(pf.max("trip_distance_med")).first()[0] # Assign trip_distance_med to a new variable
    df = df.withColumn("trip_distance - trip_distance_MED", pf.abs(pf.col("trip_distance")-tdMed)) # Add a new column for trip distance -tdMed
    med = df.select(pf.percentile_approx("trip_distance - trip_distance_MED", 0.5).alias("NewtdMed")) # Median value of ta - taMed
    NewtdMed = med.select(pf.max("NewtdMed")).first()[0] # Assign the New med to a variable
    tdMad = NewtdMed*1.483 # Working out tdMad and setting as a variable
    
    df = df.withColumn("Abs Z Score Trip Distance", pf.abs((pf.col("trip_distance")-tdMed)/tdMad))
    
    df = df.filter(pf.col("Abs Z Score Total Amount") < 3.5).drop("total_amount - total_amount_MED","Abs Z Score Total Amount") # Filter where TA < 3.5
    df = df.filter(pf.col("Abs Z Score Trip Distance") < 3.5).drop("trip_distance - trip_distance_MED","Abs Z Score Trip Distance") # Filtering where TD < 3.5
    
    return df

In [None]:
# execute step 1.2
trips_12 = t12_remove_outliers(trips_11)

print_count(trips_12)
display(trips_12.take(10))

Row count: 2,315,721


PULocationID,DOLocationID,trip_distance,passenger_count,total_amount
90,68,0.8,1.0,8.8
113,90,0.9,1.0,8.8
88,232,2.8,1.0,13.8
79,249,1.4,1.0,12.3
114,90,1.6,1.0,12.8
90,144,1.8,1.0,13.3
114,48,2.0,1.0,14.75
48,152,5.7,1.0,22.3
234,148,1.8,1.0,14.75
79,141,3.5,2.0,17.3


## STEP 2 - Compute new columns

### Step 2.1 - Zone names

Obtain the **start** and **end** zone names of each trip by joining the `trips` and `zone_names` datasets (i.e. by using the `zone_names` dataset as lookup table).

**Note:** The columns containing the start and end zone ids of each trip are named `PULocationID` and `DOLocationID`, respectively.

### Step 2.2 - Unit profitability

Compute the column `unit_profitabilty = total_amount / trip_distance`.

In [None]:
# STEP 2.1 GET THE ZONE NAMES
def t21_join_zones(df, zones_df = zone_names):
    # input: output of step 1.2 and zone_names dataset
    # 1. Obtain Start Zone
    df = df.join(zone_names, df.PULocationID == zones_df.LocationID, 'left').drop("LocationID",'Borough')
    df = df.withColumnRenamed("Zone", "start_zone").withColumnRenamed("service_zone", "start_service_zone")
    
    # 2. Obtain End Zone
    df = df.join(zone_names, df.DOLocationID == zones_df.LocationID, 'left').drop("LocationID",'Borough')
    df = df.withColumnRenamed("Zone", "end_zone").withColumnRenamed("service_zone", "end_service_zone")
    return df

In [None]:
# execute step 2.1
trips_21 = t21_join_zones(trips_12, zones_df = zone_names)

print_count(trips_21)
display(trips_21.take(10))

Row count: 2,315,721


PULocationID,DOLocationID,trip_distance,passenger_count,total_amount,start_zone,start_service_zone,end_zone,end_service_zone
90,68,0.8,1.0,8.8,Flatiron,Yellow Zone,East Chelsea,Yellow Zone
113,90,0.9,1.0,8.8,Greenwich Village North,Yellow Zone,Flatiron,Yellow Zone
88,232,2.8,1.0,13.8,Financial District South,Yellow Zone,Two Bridges/Seward Park,Yellow Zone
79,249,1.4,1.0,12.3,East Village,Yellow Zone,West Village,Yellow Zone
114,90,1.6,1.0,12.8,Greenwich Village South,Yellow Zone,Flatiron,Yellow Zone
90,144,1.8,1.0,13.3,Flatiron,Yellow Zone,Little Italy/NoLiTa,Yellow Zone
114,48,2.0,1.0,14.75,Greenwich Village South,Yellow Zone,Clinton East,Yellow Zone
48,152,5.7,1.0,22.3,Clinton East,Yellow Zone,Manhattanville,Boro Zone
234,148,1.8,1.0,14.75,Union Sq,Yellow Zone,Lower East Side,Yellow Zone
79,141,3.5,2.0,17.3,East Village,Yellow Zone,Lenox Hill West,Yellow Zone


In [None]:
# STEP 2.2 CALCULATE THE UNIT PROFITABILITY
def t22_calc_profit(df):
    # input: output of task 2.1
    df = df.withColumn("unit_profitability", pf.when(df.trip_distance==0, 0).otherwise(pf.col("total_amount")/pf.col("trip_distance"))) # Obtain the column unit profitability with the condition if the denominator is 0, set it to 0 to prevent error/ infinite number
    
    return df

In [None]:
# execute task 2.2
trips_22 = t22_calc_profit(trips_21)

print_count(trips_22)
display(trips_22.take(10))

Row count: 2,315,721


PULocationID,DOLocationID,trip_distance,passenger_count,total_amount,start_zone,start_service_zone,end_zone,end_service_zone,unit_profitability
90,68,0.8,1.0,8.8,Flatiron,Yellow Zone,East Chelsea,Yellow Zone,11.0
113,90,0.9,1.0,8.8,Greenwich Village North,Yellow Zone,Flatiron,Yellow Zone,9.77777777777778
88,232,2.8,1.0,13.8,Financial District South,Yellow Zone,Two Bridges/Seward Park,Yellow Zone,4.928571428571429
79,249,1.4,1.0,12.3,East Village,Yellow Zone,West Village,Yellow Zone,8.785714285714286
114,90,1.6,1.0,12.8,Greenwich Village South,Yellow Zone,Flatiron,Yellow Zone,8.0
90,144,1.8,1.0,13.3,Flatiron,Yellow Zone,Little Italy/NoLiTa,Yellow Zone,7.388888888888889
114,48,2.0,1.0,14.75,Greenwich Village South,Yellow Zone,Clinton East,Yellow Zone,7.375
48,152,5.7,1.0,22.3,Clinton East,Yellow Zone,Manhattanville,Boro Zone,3.912280701754386
234,148,1.8,1.0,14.75,Union Sq,Yellow Zone,Lower East Side,Yellow Zone,8.194444444444445
79,141,3.5,2.0,17.3,East Village,Yellow Zone,Lenox Hill West,Yellow Zone,4.942857142857143


## Step 3: Rank zones by traffic, passenger volume and profitability

### 3.1 - Summarise interzonal travel

We built a graph data structure of zone-to-zone traffic, representing aggregated data about trips between any two zones. The graph will have one node for each zone and one edge connecting each pair of zones. In addition, edges contain aggregate information about all trips between those zones. 

For example, zones Z1 and Z2 are connected by *two* edges: edge Z1 --> Z2 carries aggregate data about all trips that originated in Z1 and ended in Z2, and edge Z2 --> Z2 carries aggregate data about all trips that originated in Z2 and ended in Z1.

The aggregate information of interzonal travel include the following data:

- `average_unit_profit` - the average unit profitability (calculated as `mean(unit_profitabilty)`).
- `trips_count` -- the total number of recorded trips.
- `total_passengers` -- the total number of passenger across all trips (sum of `passenger_count`).

This graph is represented as a new dataframe, with schema:

\[`PULocationID`, `DOLocationID`, `average_unit_profit`, `trips_count`, `total_passengers` \]

__hint__: the `groupby()` operator produces a `pyspark.sql.GroupedData` structure. We then calculated multiple aggregations from this using `pyspark.sql.GroupedData.agg()`: 
- https://spark.apache.org/docs/3.2.0/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.groupby.html
- https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.GroupedData.agg.html

### 3.2 - Obtain top-10 zones

For each of the following measures, we reported the top-10 zones _using their plain names we dereferenced in the previous step, not the codes_. Note that this requires ranking the nodes in different orders. Specifically, we calculated the following further aggregations:

- the **total** number of trips originating from Z. This is simply the sum of `trips_count` over all outgoing edges for Z, i.e., edges of the form Z -> \*
- the **average** profitability of a zone. This is the average of all `average_unit_profit` over all *outgoing* edges from Z.
- The **total** passenger volume measured as the **sum** of `total_passengers` carried in trips that originate from Z

In [None]:
## STEP 3.1 SUMMARISE TRIPS
def t31_summarise_trips(df):
    # input: output of step 2.2
    
    df = df.groupby(['PULocationID', 'DOLocationID']).agg({'trip_distance':'count','unit_profitability':'mean', 'passenger_count':'sum'}) # Calculating aggregations
    df = df.withColumnRenamed('count(trip_distance)','trips_count').withColumnRenamed('avg(unit_profitability)','average_unit_profit').withColumnRenamed('sum(passenger_count)','total_passengers') # Renaming the automatic renaming system to our preferrred column names
    df = df.select('PULocationID','DOLocationID','trips_count','average_unit_profit','total_passengers') # Selecting only 5 columns we're interested in
    
    return df

In [None]:
# execute step 3.1
graph = t31_summarise_trips(trips_22)

print_count(graph)
display(graph.take(10))

Row count: 9,992


PULocationID,DOLocationID,trips_count,average_unit_profit,total_passengers
90,231,1070,7.40012570278701,1576.0
87,33,103,6.8281408701862905,145.0
90,142,864,6.865108002795456,1276.0
114,100,435,7.453583718690992,705.0
234,144,1429,9.111810662896689,2186.0
148,262,246,4.800257985479026,398.0
246,249,1074,8.754483246045579,1713.0
170,179,74,5.116606255081469,101.0
48,232,304,5.423677115276072,496.0
142,144,318,5.74876810849625,486.0


In [None]:
# STEP 3.2 OBTAIN TOP 10 ZONES
def t32_summarise_zones_pairs(df, zones_df = zone_names):
    df = df.join(zones_df, df.PULocationID == zone_names.LocationID,'left').drop("LocationID", "Borough", "service_zone")
    df = df.withColumnRenamed("Zone", "PU Zone")
    
    return df

# Top 10 ranked zones by traffic (trip volume)
def t32_top10_trips(df_zones):
    # input: output of step 3.2
    df_zones = df_zones.groupby(['PU Zone']).agg({'trips_count':'sum'})
    df_zones = df_zones.orderBy('sum(trips_count)', ascending=False).take(10)
    return df_zones

# Top 10 ranked zones by profit
def t32_top10_profit(df_zones):
    # input: output of step 3.2
    df_zones = df_zones.groupby(['PU Zone']).agg({'average_unit_profit':'mean'})
    df_zones = df_zones.orderBy('avg(average_unit_profit)', ascending=False).take(10)
    
    return df_zones

# Top 10 ranked zones by passenger volume
def t32_top10_passenger(df_zones):
    # input: output of step 3.2
    df_zones = df_zones.groupby(['PU Zone']).agg({'total_passengers':'sum'})
    df_zones = df_zones.orderBy('sum(total_passengers)', ascending=False).take(10)
    
    return df_zones

In [None]:
# execute step 3.2
zones = t32_summarise_zones_pairs(graph)

top10_trips     = t32_top10_trips(zones)
top10_profit    = t32_top10_profit(zones)
top10_passenger = t32_top10_passenger(zones)

In [None]:
# we can use 'display()' or return a pandas DataFrame for 'pretty' output
top10_trips
# display(top10_trips)

Out[698]: [Row(PU Zone='Upper East Side South', sum(trips_count)=116958),
 Row(PU Zone='Upper East Side North', sum(trips_count)=99159),
 Row(PU Zone='Penn Station/Madison Sq West', sum(trips_count)=93102),
 Row(PU Zone='Midtown Center', sum(trips_count)=90077),
 Row(PU Zone='Murray Hill', sum(trips_count)=84096),
 Row(PU Zone='Midtown East', sum(trips_count)=81822),
 Row(PU Zone='Lincoln Square East', sum(trips_count)=77298),
 Row(PU Zone='Upper West Side South', sum(trips_count)=70849),
 Row(PU Zone='Lenox Hill West', sum(trips_count)=70787),
 Row(PU Zone='Clinton East', sum(trips_count)=70763)]

In [None]:
# we can use 'display()' return a pandas DataFrame for 'pretty' output
top10_profit

Out[699]: [Row(PU Zone='Queensboro Hill', avg(average_unit_profit)=259.10438358224854),
 Row(PU Zone='Baisley Park', avg(average_unit_profit)=207.86238469772005),
 Row(PU Zone='Eastchester', avg(average_unit_profit)=107.23434754068342),
 Row(PU Zone='Saint Michaels Cemetery/Woodside', avg(average_unit_profit)=73.12533203469059),
 Row(PU Zone='Van Cortlandt Village', avg(average_unit_profit)=67.24848896044932),
 Row(PU Zone='Crotona Park', avg(average_unit_profit)=62.131449263524736),
 Row(PU Zone='Riverdale/North Riverdale/Fieldston', avg(average_unit_profit)=62.01852099609552),
 Row(PU Zone='Flatbush/Ditmas Park', avg(average_unit_profit)=58.974842627440225),
 Row(PU Zone='South Jamaica', avg(average_unit_profit)=49.331971168811926),
 Row(PU Zone='Bellerose', avg(average_unit_profit)=48.46950206498166)]

In [None]:
# we can use 'display()' or return a pandas DataFrame for 'pretty' output
top10_passenger

Out[700]: [Row(PU Zone='Upper East Side South', sum(total_passengers)=170046.0),
 Row(PU Zone='Upper East Side North', sum(total_passengers)=144039.0),
 Row(PU Zone='Penn Station/Madison Sq West', sum(total_passengers)=137187.0),
 Row(PU Zone='Midtown Center', sum(total_passengers)=134869.0),
 Row(PU Zone='Murray Hill', sum(total_passengers)=123818.0),
 Row(PU Zone='Midtown East', sum(total_passengers)=119729.0),
 Row(PU Zone='Lincoln Square East', sum(total_passengers)=114769.0),
 Row(PU Zone='Clinton East', sum(total_passengers)=107130.0),
 Row(PU Zone='Upper West Side South', sum(total_passengers)=105908.0),
 Row(PU Zone='East Village', sum(total_passengers)=105581.0)]

## STEP 4 - Record the pipeline's execution time

Here, we recorded the execution time of:

1. the whole pipeline
2. the whole pipeline except step 1.2

on the two tables below, for all dataset sizes: `'S'`, `'M'`, `'L'`, `'XL'`, `'XXL'`, and data formats: `parquet` and `delta`.

We analysed the resulting execution times and comment on the effect of dataset size, dataset format and task complexity (with and without step 1.2) on pipeline performance.

In [None]:
# after developing our solution, it may be convenient to combine all of our functions in a single cell (at the start or end of the notebook)

# CHANGE the value of the following arguments to record the pipeline execution times for increasing dataset sizes
SIZE = 'XXL'
DATA_FORMAT = 'parquet'
# DATA_FORMAT = 'delta'
WITH_STEP_12 = False

# Load trips dataset
trips = init_trips(SIZE, DATA_FORMAT)


    Trips dataset loaded!
    ---
      Size: XXL
      Format: parquet
      Tables loaded: ['/FileStore/tables/taxi/parquet/tripdata_1_6_2019.parquet', '/FileStore/tables/taxi/parquet/tripdata_7_12_2019.parquet', '/FileStore/tables/taxi/parquet/tripdata_2020_21.parquet']
      Number of trips (dataset rows): 132,396,785
    


In [None]:
# run and record the resulting execution time shown by databricks (on the cell footer)

# IMPORTANT: this function calls all task functions in order of occurrence. For this code to run without errors, we have to load into memory all of the previous task-specific functions, even if we haven't implemented these yet.
pipeline(trips, with_step_12 = WITH_STEP_12)

Out[702]: [[Row(PU Zone='Upper East Side South', sum(trips_count)=5404917),
  Row(PU Zone='Upper East Side North', sum(trips_count)=4983505),
  Row(PU Zone='Midtown Center', sum(trips_count)=4756098),
  Row(PU Zone='Penn Station/Madison Sq West', sum(trips_count)=4243375),
  Row(PU Zone='Midtown East', sum(trips_count)=4202941),
  Row(PU Zone='Times Sq/Theatre District', sum(trips_count)=3749004),
  Row(PU Zone='Murray Hill', sum(trips_count)=3698612),
  Row(PU Zone='Clinton East', sum(trips_count)=3673679),
  Row(PU Zone='Lincoln Square East', sum(trips_count)=3639118),
  Row(PU Zone='JFK Airport', sum(trips_count)=3613878)],
 [Row(PU Zone='Newark Airport', avg(average_unit_profit)=80.76002443852747),
  Row(PU Zone='Arden Heights', avg(average_unit_profit)=39.63433605421933),
  Row(PU Zone='Rockaway Park', avg(average_unit_profit)=35.086417179761646),
  Row(PU Zone='Rossville/Woodrow', avg(average_unit_profit)=31.667475964730446),
  Row(PU Zone="Eltingville/Annadale/Prince's Bay", avg

_Table 1. Pipeline performance for `parquet` format._

| metric                      | S    | M    | L    | XL   | XXL  |
|-----------------------------|------|------|------|------|------|
| rows (M)                    |  2,898,033 |  15,571,166|  41,953,716 |  90,443,069 |  132,396,785 |
| execution time   (w/o 1.2)  | 10.02 s | 20.35 s | 31.56 m | 28.82 m | 22.47 m |
| execution time              | 17.43 s | 21.13 s | 28.69 m | 40.46 m | 1.32 h |
| sec / 1M records (w/o 1.2)  | 3.46 s  | 1.31 s  | 45.14 s  | 19.12 s  | 0.17 s  |
| sec / 1M records            | 6.01 s | 1.36 s  | 41.03 s  | 26.84 s  | 35.89 s  |

_Table 2. Pipeline performance for `delta` format._

| metric                      | S    | M    | L    | XL   | XXL  |
|-----------------------------|------|------|------|------|------|
| rows (M)                    |  2,898,033 |  15,571,166 |  41,953,716 |  90,443,069 |  132,396,785 |
| execution time   (w/o 1.2)  | 6.61 s | 10.84 s | 10.97 s | 35.46 s | 48.58 s |
| execution time              | 51.12 s | 27.84 s | 26.15 s | 35.46 s | 45.46 s |
| sec / 1M records (w/o 1.2)  | 2.28 s  | 0.7 s  | 0.26 s | 0.39 s  | 0.37 s  |
| sec / 1M records            | 17.64 s  | 1.79 s  | 0.62 s  | 0.39 s  | 0.34 s  |