# Process Data

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import json
import os
import sys
import warnings
from datetime import datetime
from functools import partial
from glob import glob
from itertools import product
from typing import Dict, List

import duckdb
import numpy as np
import pandas as pd
import requests
from contexttimer import Timer
from tqdm.contrib import concurrent as concurrent_tq
from watermark import watermark

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
PROJ_ROOT = os.pardir
src_dir = os.path.join(PROJ_ROOT, "src")
sys.path.append(src_dir)

In [4]:
%aimport clean
import clean as cl

%aimport file_utils
import file_utils as flut

%aimport pandas_utils
import pandas_utils as pu

%aimport read
import read

## About

### Objective

This notebook processes raw bike share trips data that will be used later in exploratory data analysis.

### Outputs

1. (56 files) Processed periodic (quarterly or monthly) bike share ridership data with a filename of the format `processed__trips_YYYY_(mm-or-qq)__YYYYmmdd_HHMMSS.parquet.gzip` containing the following columns
   - `trip_id`
     - trip identifier
   - `start_station_id`
     - station ID for station from which trip departs
   - `start_station_name`
     - station name for station from which trip departs
   - `end_station_id`
     - station ID for station at which trip ends
   - `end_station_name`
     - station name for station at which trip ends
   - `started_at`
     - trip departure timestamp
   - `ended_at`
     - trip arrival timestamp
   - `bike_id`
     - identifier for bike used in trip
   - `user_type`
     - type of bike share user (Casual or Annual)
   - `started_at_year`
     - year of trip departure (from departure timestamp)
   - `started_at_month`
     - month of trip departure
   - `started_at_day`
     - day of the month (1-31) of trip departure
   - `started_at_hour`
     - hour of trip departure
   - `started_at_minute`
     - minute of trip departure
   - `ended_at_year`
     - year of trip arrival
   - `ended_at_month`
     - month of trip arrival
   - `ended_at_day`
     - day of the month of trip arrival
   - `ended_at_hour`
     - hour of trip arrival
   - `ended_at_minute`
     - minute of trip arrival

### Assumptions

1. For the current business use-case, the current date is assumed to be in the month of April in 2023. The stations metadata was retrieved during February of 2024. So, it will be assumed that the metadata from February 2024 matches the station metadata from April 2023.

## User Inputs

In [5]:
# raw bike ridership
cols_to_drop = ["trip_duration"]
trip_buffer_mins = 5

# processed
proc_output_keys = [
    'file',
    'year',
    'period',
    'raw',
    'nans',
    'proc',
    'raw_casual',
    'proc_casual',
    'raw_annual',
    'proc_annual',
]

# timezone
my_timezone = 'America/Toronto'

In [6]:
data_dir = os.path.join(PROJ_ROOT, 'data')
raw_data_dir = os.path.join(data_dir, 'raw', 'systems', 'toronto')
processed_data_dir = os.path.join(data_dir, 'processed')

fpaths = sorted(glob(os.path.join(raw_data_dir, '*.csv')))

In [7]:
def run_parallel(
    inputs_product: product,
    fn,
    chunk_size: int=100,
) -> pd.DataFrame:
    """Run function against multiple inputs in parallel."""
    iterables = list(inputs_product)
    outputs = list(
        concurrent_tq.process_map(
            fn,
            *zip(*iterables),
            max_workers=12,
            chunksize=chunk_size,
        )
    )
    return outputs


def run_sql_query(query: str, verbose: bool=False) -> pd.DataFrame:
    """Run SQL query using DuckDB."""
    with warnings.catch_warnings():
        warnings.simplefilter("ignore", FutureWarning)
        df_query = duckdb.sql(query).df()
    if verbose:
        print(f"Query returned {len(df_query):,} rows")
    return df_query

In [8]:
def run_trips_etl_pipeline(
    f: str,
    processed_data_dir: str,
    cols_to_drop: List[str],
    buffer_mins: int=5,
) -> pd.DataFrame:
    """Run ETL to load and process raw bike share ridership data."""
    fname = os.path.basename(f)
    if "2018" not in f:
        period, year = [
            f[-6:].replace('.csv', ''),
            f[-11:].replace('.csv', '').split('-')[0]
        ]
    else:
        yq = fname.split("Ridership_Q")[-1]
        period, year = yq.split(" ")
        year = year.split(".")[0]
        period = "Q" + period

    # EXTRACT and TRANSFORM
    with Timer() as t:
        df = (
            # 1. read monthly CSV file
            read.read_csv_file(*read.get_read_csv_inputs(f), "%m/%d/%Y %H:%M")
            # 2. clean column names
            .rename(
                mapper=lambda x: (
                    (
                        x.lower()
                        .replace("  ", " ")
                        .replace(" ", "_")
                        .replace(' ', '_')
                        .replace('__', '_')
                    )
                ),
                axis='columns',
            )
        )
        # 3. standardize column names across different years
        if year != "2018":
            df = df.rename(columns={"start_time": "started_at", "end_time": "ended_at"})
        else:
            df = (
                df
                .rename(
                    columns={
                        "to_station_name": "end_station_name",
                        "to_station_id": "end_station_id",
                        "from_station_name": "start_station_name",
                        "from_station_id": "start_station_id",
                        "trip_start_time": "started_at",
                        "trip_stop_time": "ended_at",
                        "trip_duration_seconds": "trip_duration",
                    }
                )
                .assign(bike_id=lambda df: None)
            )

        # 4. remove trips that are longer than allowed
        if int(year) <= 2020:
            max_duration = (30+buffer_mins)*60
        elif int(year) == 2021:
            if period in ['01', '02', '03', '04', '05', '06']:
                max_duration = (30+buffer_mins)*60
            else:
                max_duration = (45+buffer_mins)*60
        elif int(year) == 2022:
            max_duration = (45+buffer_mins)*60
        else:
            assert int(year) == 2023
            if period in ['01', '02', '03']:
                max_duration = (45+buffer_mins)*60
            else:
                max_duration = (90+buffer_mins)*60
        df_valid = (
            df
            # extract trip duration
            .assign(
                trip_duration=lambda df: (
                    df['ended_at']-df['started_at']
                ).dt.total_seconds()
            )
            # filter out longer-than-allowed trips
            .query(f"(trip_duration > 60) & (trip_duration <= {max_duration})")
            # drop unwanted columns
            .drop(columns=cols_to_drop)
        )

        # 5. drop trips with missing values in start & end station ID columns
        df_no_nans = (
            df_valid
            .dropna(subset=['start_station_id'])
            .dropna(subset=['end_station_id'])
        )

        # 6. and 7. clean station names and extract datetime attributes
        df_proc = (
            df_no_nans
            # 6. clean station names
            .pipe(cl.clean_status_station_names, ['start_station_name'])
            .pipe(cl.clean_status_station_names, ['end_station_name'])
            # 7. get datetime attributes
            .assign(
                started_at_year=lambda df: df["started_at"].dt.year,
                started_at_month=lambda df: df["started_at"].dt.month,
                started_at_day=lambda df: df["started_at"].dt.day,
                started_at_hour=lambda df: df["started_at"].dt.hour,
                started_at_minute=lambda df: df["started_at"].dt.minute,
                ended_at_year=lambda df: df["ended_at"].dt.year,
                ended_at_month=lambda df: df["ended_at"].dt.month,
                ended_at_day=lambda df: df["ended_at"].dt.day,
                ended_at_hour=lambda df: df["ended_at"].dt.hour,
                ended_at_minute=lambda df: df["ended_at"].dt.minute,
            )
            .astype({"bike_id": pd.Int64Dtype()})
        )

    # Post-Processing - get duplicated trips
    df_dup_trips = df_proc.loc[
        df_proc.duplicated(
            subset=[
                "trip_id",
                "start_station_id",
                "start_station_name",
                "started_at",
                "end_station_id",
                "end_station_name",
                "ended_at",
            ]
        )
    ]

    # LOAD
    # 8. export processed data to disk
    fname_prefix = f"processed__trips_{year}_{period}"
    _ = df_proc.pipe(flut.load, processed_data_dir, fname_prefix)
    fname_prefix = f"raw__trips_{year}_{period}"
    _ = df.pipe(flut.load, processed_data_dir, fname_prefix)
    return {
        "raw_data": df,
        "data": df_proc,
        'file': fname,
        'year': year,
        'period': period,
        'buffer_mins': buffer_mins,
        'raw': len(df),
        'raw_casual': len(df.query("user_type == 'Casual Member'")),
        'raw_annual': len(df.query("user_type == 'Annual Member'")),
        "nans": len(df) - len(df_no_nans),
        'valid_trip_length': len(df_valid),
        'proc': len(df_proc),
        'proc_casual': len(df_proc.query("user_type == 'Casual Member'")),
        'proc_annual': len(df_proc.query("user_type == 'Annual Member'")),
        'raw_bikes': df['bike_id'].dropna().nunique(),
        'proc_bikes': df_proc['bike_id'].dropna().nunique(),
    }

## Get and Process Bike Share Trips Data

### Load and Process the Raw Bike Share Trips Data

Load the raw monthly bike share trips data and perform the following data processing tasks

1. load CSV file, correctly parsing the start and end `datetime` for each trip
2. clean columm names
3. standardize column names across different years
   - for 2019 onwards, rename `start_time` to `started_at` and `end_time` to `ended_At`
   - for 2018, rename
     - `to_station_name` to `end_station_name`
     - `to_station_id` to `end_station_id`
     - `from_station_name` to `start_station_name`
     - `from_station_id` to `start_station_id`
     - `trip_start_time` to `started_at`
     - `trip_stop_time` to `ended_at`
     - `trip_duration_seconds` to `trip_duration`
4. remove trips that were of a duration that is longer than allowed
   - for 2018, 2019, 2020 and 2021 (January to June, inclusive)
     - max trip length allowed is 30 minutes
   - for 2021 (July to December), 2022 and 2023 (January to March, inclusive)
     - max trip length allowed is 45 minutes
   - for 2023 (April to December)
     - max trip length allowed is 90 minutes
5. drop trips with missing values in the start and end station ID and name columns
   - drop trips with missing values in the start ID or end ID column
   - this step is necessary since part of the business use-case for this project requires extracting insights among the top-performing and other stations, so we'll need to aggregate columns based on their ID
6. clean the start and end station names
   - this is needed in order to get the latitude and longitude for each station, which are then used to get the neighbourhood containing each station
   - the neighbourhood is needed in order to explore ridership data from a geospatial perspective
7. extract `datetime` attributes (year, month, day, hour, minute)
8. export processed data to disk

In [10]:
%%time
outputs = run_parallel(
    product(
        fpaths,
        [processed_data_dir],
        [cols_to_drop],
        [trip_buffer_mins],
    ),
    run_trips_etl_pipeline,
    chunk_size=4,
)

100%|██████████| 56/56 [04:21<00:00,  4.67s/it]  


CPU times: user 8.65 s, sys: 11.3 s, total: 19.9 s
Wall time: 4min 22s


In [10]:
fpaths_processed = sorted(
    glob(
        os.path.join(processed_data_dir, 'processed__*.parquet.gzip')
    )
)
fpaths_processed_raw = sorted(
    glob(
        os.path.join(processed_data_dir, 'raw__*.parquet.gzip')
    )
)

In [11]:
%%time
for c in ['start', 'end']:
    with pd.option_context('display.max_rows', None):
        print(c)
        col = f"{c}_station_name"
        df_combo = (
            pd.read_parquet(fpaths_processed, columns=[col])
            .drop_duplicates(subset=[col])
        )
        display(df_combo.query(f"{col}.str.contains('ern Ave')"))
        display(df_combo.query(f"{col}.str.contains('Bishop')"))

start


Unnamed: 0,start_station_name
645866,Queen St E Eastern Ave
787327,Morse St Eastern Ave
788364,Eastern Ave / Winnifred Ave
2741664,Sackville St Eastern Ave
2800250,Gerrard St E / Malvern Ave


Unnamed: 0,start_station_name
32,Bathurst St / Queens Quay Billy Bishop Airport


end


Unnamed: 0,end_station_name
636015,Queen St E Eastern Ave
785209,Morse St Eastern Ave
794089,Eastern Ave / Winnifred Ave
2739532,Sackville St Eastern Ave
2797543,Gerrard St E / Malvern Ave


Unnamed: 0,end_station_name
46,Bathurst St / Queens Quay Billy Bishop Airport


CPU times: user 7.53 s, sys: 1.55 s, total: 9.09 s
Wall time: 7.47 s


**Notes**

1. A [45-minute trip allowance for Annual members started in July 2021](https://bikesharetoronto.com/news/annual-45-memberships/). For this reason, `max_dur` switches from allowing a maximum trip length of 30 minutes to 45 minutes in July of 2021.
2. [The 90-minute trip allowance came into effect on April 3, 2023](https://bikesharetoronto.com/news/recommended-pricing-changes-2023/). Due to this, the 45-minute maximum allowable trip length changes to 90 minutes in April of 2023. The raw data only shows the member type (Casual and Annual), but it does not indicate which trips (by Annual or Casual members or both) were subject to a 90-minute duration. During data processing for the last nine months, the logic used to filter out invalid trips based on length (i.e. to filter out trips longer than 90 minutes) was simply applied to all trips. This logic is likely over-simplifying how the 90-minute threshold should be applied.

### Fraction of Missing Values Per Month

Get the fraction of raw and processed bike share trips that are missing values in the starting and ending station name and bike ID columns

In [12]:
%%time
nan_cols = [
    'trip_id',
    'started_at',
    'start_station_name',
    'ended_at',
    'end_station_name',
    'bike_id',
    'user_type',
]
dfs_nans_sumaries = []
for o_r, o_p in zip(fpaths_processed_raw, fpaths_processed):
    with Timer() as t:
        df_proc = (
            pd.read_parquet(o_p, columns=nan_cols+['started_at_year', 'started_at_month'])
            .drop(columns=['started_at', 'ended_at'])
            .convert_dtypes()
        )
        df_raw = (
            pd.read_parquet(o_r, columns=nan_cols)
            .assign(
                started_at_year=lambda df: df['started_at'].dt.year,
                started_at_month=lambda df: df['started_at'].dt.month,
            )
            .drop(columns=['started_at', 'ended_at'])
            .convert_dtypes()
        )
        if '2018' in o_r and '2018' in o_p:
            df_proc = df_proc.astype({"bike_id": pd.Int64Dtype()})
            df_raw = df_raw.astype({"bike_id": pd.Int64Dtype()})
        query = """
                WITH t1 AS (
                    -- get monthly missing values in processed station name and bike ID columns
                    SELECT started_at_year,
                           started_at_month,
                           SUM(IF(start_station_name IS NULL, 1, 0)) AS start_nans_proc,
                           SUM(IF(end_station_name IS NULL, 1, 0)) AS end_nans_proc,
                           SUM(IF(bike_id IS NULL, 1, 0)) AS bike_id_nans_proc,
                           COUNT(DISTINCT(trip_id)) AS proc_trips,
                           SUM(CASE WHEN user_type = 'Annual Member' THEN 1 ELSE 0 END) AS proc_annual,
                           SUM(CASE WHEN user_type = 'Casual Member' THEN 1 ELSE 0 END) AS proc_casual
                    FROM df_proc
                    GROUP BY ALL
                ),
                t2 AS (
                    -- get monthly missing values in raw station name and bike ID columns
                    SELECT started_at_year,
                           started_at_month,
                           SUM(IF(start_station_name IS NULL, 1, 0)) AS start_nans_raw,
                           SUM(IF(end_station_name IS NULL, 1, 0)) AS end_nans_raw,
                           SUM(IF(bike_id IS NULL, 1, 0)) AS bike_id_nans_raw,
                           COUNT(DISTINCT(trip_id)) AS raw_trips,
                           SUM(CASE WHEN user_type = 'Annual Member' THEN 1 ELSE 0 END) AS raw_annual,
                           SUM(CASE WHEN user_type = 'Casual Member' THEN 1 ELSE 0 END) AS raw_casual
                    FROM df_raw
                    GROUP BY ALL
                ),
                -- rename processed year and month columns
                t3 AS (
                    SELECT started_at_year AS year,
                           started_at_month AS month,
                           start_nans_proc,
                           end_nans_proc,
                           bike_id_nans_proc,
                           proc_trips,
                           proc_annual,
                           proc_casual
                    FROM t1
                ),
                -- rename raw year and month columns
                t4 AS (
                    SELECT started_at_year AS year,
                           started_at_month AS month,
                           start_nans_raw,
                           end_nans_raw,
                           bike_id_nans_raw,
                           raw_trips,
                           raw_annual,
                           raw_casual
                    FROM t2
                ),
                -- calculate fraction of missing values in raw & processed columns
                t5 AS (
                    SELECT *,
                           100*start_nans_raw/raw_trips AS frac_start_nans_raw,
                           100*start_nans_proc/proc_trips AS frac_start_nans_proc,
                           100*end_nans_raw/raw_trips AS frac_end_nans_raw,
                           100*end_nans_proc/proc_trips AS frac_end_nans_proc,
                           100*bike_id_nans_raw/raw_trips AS frac_bike_id_nans_raw,
                           100*bike_id_nans_proc/proc_trips AS frac_bike_id_nans_proc
                    FROM t3
                    LEFT JOIN t4 USING(year, month)
                )
                -- drop unwanted columns
                SELECT * EXCLUDE(
                    start_nans_proc,
                    end_nans_proc,
                    bike_id_nans_raw,
                    bike_id_nans_proc
                )
                FROM t5
                """
        df_nans_sumary = run_sql_query(query).convert_dtypes()
    print(f"Aggregated missing values for {os.path.basename(o_r)} in {t.elapsed:.3f}s")
    dfs_nans_sumaries.append(df_nans_sumary)
df_nans_query = (
    pd.concat(dfs_nans_sumaries)
    .sort_values(by=['year', 'month'], ignore_index=True)
)
pu.show_nans_dtypes(df_nans_query, show_transpose=True)
display(
    df_nans_query
    .drop(
        columns=[
            'raw_trips',
            'proc_trips',
            'start_nans_raw',
            'end_nans_raw',
        ]
    )
    .style.apply(
        pu.highlight_multiple_columns_row_greater_than,
        cols_to_check=[
            'frac_start_nans_raw',
            'frac_start_nans_proc',
            'frac_end_nans_raw',
            'frac_end_nans_proc',
            'frac_bike_id_nans_raw',
            'frac_bike_id_nans_proc',
        ],
        axis=None,
    )
)

Aggregated missing values for raw__trips_2018_Q1__20240229_142552.parquet.gzip in 0.475s
Aggregated missing values for raw__trips_2018_Q2__20240229_142652.parquet.gzip in 1.270s
Aggregated missing values for raw__trips_2018_Q3__20240229_142812.parquet.gzip in 1.954s
Aggregated missing values for raw__trips_2018_Q4__20240229_142841.parquet.gzip in 1.024s
Aggregated missing values for raw__trips_2019_Q1__20240229_142553.parquet.gzip in 0.360s
Aggregated missing values for raw__trips_2019_Q2__20240229_142705.parquet.gzip in 1.033s
Aggregated missing values for raw__trips_2019_Q3__20240229_142839.parquet.gzip in 1.740s
Aggregated missing values for raw__trips_2019_Q4__20240229_142910.parquet.gzip in 0.774s
Aggregated missing values for raw__trips_2020_01__20240229_142542.parquet.gzip in 0.190s
Aggregated missing values for raw__trips_2020_02__20240229_142554.parquet.gzip in 0.175s
Aggregated missing values for raw__trips_2020_03__20240229_142607.parquet.gzip in 0.174s
Aggregated missing va

Unnamed: 0,year,month,proc_trips,proc_annual,proc_casual,start_nans_raw,end_nans_raw,raw_trips,raw_annual,raw_casual,frac_start_nans_raw,frac_start_nans_proc,frac_end_nans_raw,frac_end_nans_proc,frac_bike_id_nans_raw,frac_bike_id_nans_proc
missing,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
dtype,Int32,Int32,Int64,Int64,Int64,Int64,Int64,Int64,Int64,Int64,Float64,Float64,Float64,Float64,Float64,Float64


Unnamed: 0,year,month,proc_annual,proc_casual,raw_annual,raw_casual,frac_start_nans_raw,frac_start_nans_proc,frac_end_nans_raw,frac_end_nans_proc,frac_bike_id_nans_raw,frac_bike_id_nans_proc
0,2018,1,42173,1170,42469,1390,0.0,0.0,0.0,0.0,100.0,100.0
1,2018,2,46930,1949,47276,2455,0.0,0.0,0.0,0.0,100.0,100.0
2,2018,3,77955,5150,78564,6405,0.0,0.0,0.0,0.0,100.0,100.0
3,2018,4,81341,9384,82194,12589,0.0,0.0,0.0,0.0,100.0,100.0
4,2018,5,158734,37819,160989,51761,0.0,0.0,0.0,0.0,100.0,100.0
5,2018,6,183737,50883,186463,64374,0.0,0.0,0.0,0.0,100.0,100.0
6,2018,7,211213,57274,215835,70481,0.0,0.0,0.0,0.0,100.0,100.0
7,2018,8,205663,58875,209770,71449,0.0,0.0,0.0,0.0,100.0,100.0
8,2018,9,203667,39453,207789,47212,0.0,0.0,0.0,0.0,100.0,100.0
9,2018,10,158462,13397,160326,15553,0.0,0.0,0.0,0.0,100.0,100.0


CPU times: user 55.9 s, sys: 3.65 s, total: 59.5 s
Wall time: 35.3 s


**Notes**

1. For the current business use case (see the project's `README.md` for details), the station ID column is needed. So, during the data processing above, all trips with missing values in these columns were dropped. The number of trips left after dropping these trips is shown in the `proc_trips` column. The number of trips in the original data, before any processing, is shown in the `raw_trips` column.
2. Before dropping trips with missing values in the `from_station_id` and `to_station_id` columns, the data was filtered to remove trips that were longer than the allowed duration. Trips with missing values in these two columns were only dropped after filtering out these longer-than-allowed trips. As mentioned at the end of the previous section, there are limitations to how effectively this filter was applied from April to December of 2023.

**Observations**

1. The fraction of processed trips with missing values in at least one of the start and end station ID columns has been slowly increasing over on a monthly basis. It went above 10% in June of 2023 and has continued to increase since then.
2. There appears to be an error in how the raw data has attributed trips to Casual (`raw_casual`) or Annual (`raw_annual`) members for the last nine months of 2023. Starting in June, the number of trips by Annual users drops, while the number of trips by Casual users continues to increase. Annual ridership is recorded as zero for the last four months of the year. It is unlikely that this is true since the annual membership is still available on the [Bike Share Toronto membership sign-up web page](https://bikesharetoronto.com/pricing/). This is likely due to a data recording error. It does not effect data analyzed for the current business use-case.

## Run Sanity Checks on Processed Bike Share Trips Data

In this section, a few sanity checks are run to verify the validity of the data processing used above. The bike share

1. manager (Toronto Parking Authority, TPA) releases [blog posts reviewing the previous year's performance](https://bikesharetoronto.com/ride-toronto/#news)
2. operator (Shift Transit) publishes [blog posts summarizing operational performance](https://shifttransit.net/blog/)

In some cases, monthly estimates are also released. As a sanity check, the totals listed in these blog posts are compared to the totals found in the raw and processed data.

Note that, for the purposes of a sanity check, bike share departures (also referred to as bike demand or trips leaving a bike share station) are considered. Bike share arrivals (also called dock demand or trips ending at a station) are not used.

### Compare Raw & Processed versus Published Yearly Ridership

Get the published yearly bike share ridership from 2018 to 2023 inclusive

In [13]:
yearly_published = (
    pd.DataFrame.from_records(
        [
            {'year': 2018, "reported_value": 2_000_000},
            {'year': 2019, "reported_value": 2_400_000},
            {'year': 2020, "reported_value": 2_900_000},
            {'year': 2021, "reported_value": 3_575_000},
            {'year': 2022, "reported_value": 4_600_000},
            {'year': 2023, "reported_value": 5_700_000},
        ]
    )
)

**Notes**

1. Published totals for yearly ridership are available below
   - [2018](https://bikesharetoronto.com/news/2018-highlights/) (~2,000,000 trips)
   - [2019](https://bikesharetoronto.com/news/2019-milestones/) (2,400,000 trips)
   - [2020](https://bikesharetoronto.com/news/2020-year-in-review/) (2,900,000 trips)
   - [2021](https://bikesharetoronto.com/news/2021-review/) (3,575,000 trips)
   - [2022](https://bikesharetoronto.com/news/a-look-back-on-2022/) (4,600,000 trips)
   - [2023](https://bikesharetoronto.com/news/looking-back-on-2023/) (5,700,000 trips)

Get the raw and **processed** yearly ridership totals over 2018 to 2023 inclusive, and compare to published values

In [14]:
%%time
query = """
        WITH t1 AS (
            -- get processed yearly total
            SELECT year,
                   SUM(proc_trips) AS proc,
                   SUM(raw_trips) AS raw_total
            FROM df_nans_query
            GROUP BY year
        )
        -- compare processed and published yearly totals and calculate percent difference
        SELECT year,
               proc,
               raw_total AS raw,
               reported_value,
               100*(reported_value-raw_total)/reported_value AS pct_diff_raw,
               100*(reported_value-proc)/reported_value AS pct_diff_proc
        FROM t1
        LEFT JOIN yearly_published USING (year)
        ORDER BY year
        """
df_yearly = run_sql_query(query).convert_dtypes()
pu.show_nans_dtypes(df_yearly, show_transpose=True)
pu.highlight_multiple_columns_v2(
    df_yearly, ['raw', 'pct_diff_raw', 'pct_diff_proc']
)

Unnamed: 0,year,proc,raw,reported_value,pct_diff_raw,pct_diff_proc
missing,0,0,0,0,0,0
dtype,Int32,Int64,Int64,Int64,Float64,Float64


Unnamed: 0,year,proc,raw,reported_value,pct_diff_raw,pct_diff_proc
0,2018,1830573,1922955,2000000,3.85225,8.47135
1,2019,2326479,2439517,2400000,-1.646542,3.063375
2,2020,2739833,2911059,2900000,-0.381345,5.523
3,2021,3435503,3575169,3575000,-0.004727,3.902014
4,2022,4514466,4620469,4600000,-0.444978,1.859435
5,2023,5621975,5713141,5700000,-0.230544,1.36886


CPU times: user 19.7 ms, sys: 361 µs, total: 20.1 ms
Wall time: 16.1 ms


**Notes**

1. For the purpose of this sanity check, the yearly totals include all 12 months of 2023. For the business use-case, insights can only be extracted from the months of January, February and March of 2023.

**Observations**

1. The number of raw and **processed** yearly trips are in agreement with the published numbers (to within 4% for raw and 9% for processed totals).

### Compare Raw & Processed versus Published Monthly Ridership for April - Dec. 2022

Get the [published monthly bikeshare ridership from 2022](https://bikesharetoronto.com/news/a-look-back-on-2022/) (see the **2022 RIDERSHIP MILESTONES** infographic)

In [15]:
monthly_published_2022 = (
    pd.DataFrame.from_records(
        [
            {"month": "April", "reported_value": 500_000, 'month': 4},
            {"month": "May", "reported_value": 1_000_000, 'month': 5},
            {"month": "June", "reported_value": 1_500_000, 'month': 6},
            {"month": "July", "reported_value": 2_300_000, 'month': 7},
            {"month": "August", "reported_value": 3_000_000, 'month': 8},
            {"month": "September", "reported_value": 3_600_000, 'month': 9},
            {"month": "October", "reported_value": 4_000_000, 'month': 10},
            {"month": "November", "reported_value": 4_400_000, 'month': 11},
            {"month": "December", "reported_value": 4_600_000, 'month': 12},
        ]
    )
)

Get the yearly ridership totals over April to December to 2022 inclusive, and compare to published values

In [16]:
%%time
query = """
        WITH t1 AS (
            -- get raw monthly totals
            SELECT year,
                   month,
                   SUM(proc_trips) AS proc_total,
                   SUM(raw_trips) AS raw_total,
                   100*GREATEST(SUM(start_nans_raw), SUM(end_nans_raw))/SUM(raw_trips) AS frac_nans_raw
            FROM df_nans_query
            WHERE year = 2022
            GROUP BY ALL
        ),
        t2 AS (
            -- get processed monthly totals
            SELECT * EXCLUDE(proc_total, raw_total),
                   proc_total,
                   raw_total,
                   SUM(proc_total) OVER(ORDER BY month) AS proc_total_run,
                   SUM(raw_total) OVER(ORDER BY month) AS raw_total_run
            FROM t1
            LEFT JOIN monthly_published_2022 USING (month)
            ORDER BY month
        ),
        -- compare processed and published monthly totals and calculate percent difference
        t3 AS (
            SELECT year,
                   month,
                   proc_total_run,
                   raw_total_run,
                   reported_value,
                   100*(reported_value-proc_total_run)/reported_value AS pct_diff_proc,
                   100*(reported_value-raw_total_run)/reported_value AS pct_diff_raw,
                   frac_nans_raw
            FROM t2
            WHERE month NOT IN (1, 2, 3)
        )
        SELECT *
        FROM t3
        """
df_monthly_2022 = run_sql_query(query).convert_dtypes()
pu.show_nans_dtypes(df_monthly_2022, show_transpose=True)
pu.highlight_multiple_columns_v2(
    df_monthly_2022,
    ['year', 'month', 'pct_diff_proc', 'pct_diff_raw'],
)

Unnamed: 0,year,month,proc_total_run,raw_total_run,reported_value,pct_diff_proc,pct_diff_raw,frac_nans_raw
missing,0,0,0,0,0,0,0,0
dtype,Int32,Int32,Int64,Int64,Int64,Float64,Float64,Float64


Unnamed: 0,year,month,proc_total_run,raw_total_run,reported_value,pct_diff_proc,pct_diff_raw,frac_nans_raw
0,2022,4,512590,521633,500000,-2.518,-4.3266,1.849382
1,2022,5,1003444,1026559,1000000,-0.3444,-2.6559,2.421345
2,2022,6,1594223,1632204,1500000,-6.281533,-8.8136,3.03313
3,2022,7,2255626,2312512,2300000,1.929304,-0.544,3.377588
4,2022,8,2942693,3017662,3000000,1.910233,-0.588733,4.647947
5,2022,9,3531842,3620479,3600000,1.893278,-0.568861,5.367466
6,2022,10,4021857,4120230,4000000,-0.546425,-3.00575,5.588783
7,2022,11,4336989,4440459,4400000,1.432068,-0.919523,6.147788
8,2022,12,4514466,4620469,4600000,1.859435,-0.444978,6.275762


CPU times: user 15.7 ms, sys: 10.4 ms, total: 26.1 ms
Wall time: 19.5 ms


**Observations**

1. The cumulative number of raw and **processed** trips, over the period of April to December of 2022, are in close agreement with the published numbers (to within 7% for raw and 9% for processed totals).

### Compare Daily Processed & Published Ridership for Wednesdays in August 2019

Get the [published weekday bikeshare ridership from August of 2019](https://bikesharetoronto.com/news/2019-milestones/) (see the **Free Ride Wednesdays Brought to you by CAA** section)

In [17]:
weekday_published_2019 = (
    pd.DataFrame.from_records(
        [
            {'year': 2019, "month": 8, 'month_name': 'August', "reported_value": 21_220}
        ]
    )
)

Aggregated trips by weekday is shown below for 2019, with the weekday during which the most trips were taken appearing at the top of the table

In [18]:
%%time
df_proc_2019 = pd.read_parquet(fpaths_processed[4:8]).convert_dtypes()
query = """
        WITH t1 AS (
            -- get processed trips from August of 2019
            SELECT *,
                   datepart('week', started_at) AS started_at_week_of_year,
                   dayname(started_at) AS started_at_weekday
            FROM df_proc_2019
            WHERE started_at_month = 8
        ),
        t2 AS (
            -- get processed total by day of week for all days in the month of August
            -- and add ranks based on highest-to-lowest daily ridership
            SELECT started_at_year AS year,
                   started_at_week_of_year AS week_of_year,
                   started_at_month AS month,
                   started_at_weekday AS weekday,
                   COUNT(DISTINCT(trip_id)) AS trips,
                   ROW_NUMBER() OVER(ORDER BY trips DESC) AS rank
            FROM t1
            WHERE started_at_year = 2019
            GROUP BY ALL
            ORDER BY trips DESC
        )
        -- calculate percent difference between published and processed totals
        -- for best-performing Wednesday
        SELECT * EXCLUDE(month),
               100*(reported_value-trips)/reported_value AS pct_diff
        FROM t2
        LEFT JOIN weekday_published_2019 USING (year, month)
        WHERE weekday = 'Wednesday'
        """
df_agg_2019 = run_sql_query(query).convert_dtypes()
pu.show_nans_dtypes(df_agg_2019, show_transpose=True)
pu.highlight_conditionally(df_agg_2019, 'rank', 1)

Unnamed: 0,year,week_of_year,weekday,trips,rank,month_name,reported_value,pct_diff
missing,0,0,0,0,0,0,0,0
dtype,Int32,Int64,string[python],Int64,Int64,string[python],Int64,Float64


Unnamed: 0,year,week_of_year,weekday,trips,rank,month_name,reported_value,pct_diff
0,2019,33,Wednesday,20288,1,August,21220,4.392083
1,2019,35,Wednesday,18207,2,August,21220,14.198869
2,2019,34,Wednesday,17354,3,August,21220,18.218662
3,2019,32,Wednesday,16819,4,August,21220,20.739868


CPU times: user 2.24 s, sys: 539 ms, total: 2.78 s
Wall time: 1.5 s


**Observations**

1. The top four weekdays in terms of **processed** bike share ridership during August of 2019 were Wednesdays (see the rank column), as expected. The top-ranking Wednesday (highlighted in yellow) has processed ridership that agrees with the published total to within 5%.

### Compare Daily Processed & Published Ridership for June 5, 2021

Get the [published weekday bikeshare ridership from August of 2019](https://bikesharetoronto.com/news/2019-milestones/) (see the **Free Ride Wednesdays Brought to you by CAA** section)

In [19]:
published_june_5_2021 = (
    pd.DataFrame.from_records(
        [
            {'year': 2021, "month": 'June', 'day': 5, "reported_value": 27_000}
        ]
    )
)

Aggregated daily trips are shown below for June 5th, 2021

In [20]:
%%time
df_june_2021 = pd.read_parquet(fpaths_processed[25]).convert_dtypes()
query = """
        WITH t1 AS (
            -- get processed trips from June 5, 2021
            SELECT *,
                   datepart('year', started_at) AS year,
                   monthname(started_at) AS month,
                   day(started_at) AS day,
                   dayname(started_at) AS day_name
            FROM df_june_2021
            WHERE month = 'June'
            AND day = 5
        ),
        -- get processed total for all trips on June 5
        t2 AS (
            SELECT year,
                   month,
                   day,
                   day_name,
                   COUNT(DISTINCT(trip_id)) AS trips
            FROM t1
            GROUP BY ALL
        )
        -- calculate percent difference between published and processed totals
        -- for June 5
        SELECT *,
               100*(reported_value-trips)/reported_value AS pct_diff
        FROM t2
        LEFT JOIN published_june_5_2021 USING (year, month, day)
        """
df_agg_june_2021 = run_sql_query(query).convert_dtypes()
pu.show_nans_dtypes(df_agg_june_2021, show_transpose=True)
pu.highlight_multiple_columns_v2(
    df_agg_june_2021, ['year', 'month', 'day', 'pct_diff']
)

Unnamed: 0,year,month,day,day_name,trips,reported_value,pct_diff
missing,0,0,0,0,0,0,0
dtype,Int64,string[python],Int64,string[python],Int64,Int64,Float64


Unnamed: 0,year,month,day,day_name,trips,reported_value,pct_diff
0,2021,June,5,Saturday,24437,27000,9.492593


CPU times: user 335 ms, sys: 75 ms, total: 410 ms
Wall time: 270 ms


**Observations**

1. There is agreement between the **processed** and published bike share ridership on June 5th, 2021 to within 10%.

### Compare Year-over-Year (YoY) Growth in Total Processed Weekend Trips in 2020

Get the [published year-over-year growth in weekend bike share ridership in 2020](https://shifttransit.net/celebrating-10-years-of-bike-share-toronto/) (see the **2020** section)

In [21]:
published_yoy_2020 = (
    pd.DataFrame.from_records(
        [
            {'reference_year': 2019, "current_year": 2020, "reported_value": 65}
        ]
    )
)

Aggregated weekend trips are shown below for 2019 and 2020 and calculate their year-over-year growth

In [22]:
%%time
df_proc_2019_2020 = pd.read_parquet(fpaths_processed[4:20]).convert_dtypes()
query = """
        WITH t1 AS (
            -- add indicator of weekend to all processed trips from 2019 & 2020
            SELECT year(started_at) AS year,
                   dayofweek(started_at) IN (6, 0) AS is_weekend,
                   trip_id
            FROM df_proc_2019_2020
        ),
        -- get yearly total by whether day was a weekend
        t2 AS (
            SELECT year,
                   is_weekend,
                   COUNT(DISTINCT(trip_id)) AS trips
            FROM t1
            WHERE is_weekend = True
            GROUP BY ALL
        ),
        -- reshape from tidy to untidy data using PIVOT
        t3 AS (
            PIVOT t2
            ON is_weekend || '_' || year
            USING sum(trips)
        ),
        -- calculate year-over-year growth in processed trips between 2019 and 2020
        t4 AS (
            SELECT *,
                   2019 AS 'reference_year',
                   2020 AS 'current_year',
                   100*(true_2020-true_2019)/true_2019 AS yoy_growth
            FROM t3
        )
        -- calculate percent difference between published and processed yearly totals
        SELECT *,
               100*(reported_value-yoy_growth)/reported_value AS pct_diff
        FROM t4
        LEFT JOIN published_yoy_2020 USING (reference_year, current_year)
        """
df_yoy_2019_2020 = run_sql_query(query).convert_dtypes()
pu.show_nans_dtypes(df_yoy_2019_2020, show_transpose=True)
pu.highlight_multiple_columns_v2(
    df_yoy_2019_2020, ['current_year', 'yoy_growth', 'pct_diff']
)

Unnamed: 0,true_2019,true_2020,reference_year,current_year,yoy_growth,reported_value,pct_diff
missing,0,0,0,0,0,0,0
dtype,Int64,Int64,Int32,Int32,Float64,Int64,Float64


Unnamed: 0,true_2019,true_2020,reference_year,current_year,yoy_growth,reported_value,pct_diff
0,553901,916721,2019,2020,65.50268,65,-0.773354


CPU times: user 5.99 s, sys: 932 ms, total: 6.92 s
Wall time: 3.39 s


**Observations**

1. The **processed** growth in weekend trips is in good agreement (<1% difference) with the published year-over-year growth.

### Compare Raw & Processed versus Published Number of Stations

Get the published yearly totals for number of stations in use across the network

In [23]:
infra_reported = pd.DataFrame.from_records(
    [
        {'year': 2018, "bikes": 3_750, "stations": 360},
        {'year': 2019, "bikes": 5_000, "stations": 465},
        {'year': 2020, "bikes": 6_850, "stations": 625},
        {'year': 2021, "bikes": 6_850, "stations": 670},
        {'year': 2022, "bikes": 7_165, "stations": 670+40},
        {'year': 2023, "bikes": 7_165+945, "stations": 670+40+110},
    ]
)

**Notes**

1. Published totals for yearly stations in use are available below
   - [2018](https://bikesharetoronto.com/news/2018-highlights/) (360 stations)
   - [2019](https://bikesharetoronto.com/news/2019-milestones/) (465 stations)
   - [2020](https://bikesharetoronto.com/news/expansion-2020/) (625 stations)
   - [2021](https://learn.sharedusemobilitycenter.org/casestudy/bike-share-torontos-four-year-growth-plan-a-data-driven-community-focused-network-expansion/) (670 stations)
   - [2022](https://www.toronto.ca/legdocs/mmis/2022/pa/bgrd/backgroundfile-229580.pdf#page=12) (670 + 40 = 710 stations) (page 53 Table 3 from section **3.1 Implementation Schedule**)
   - [2023](https://bikesharetoronto.com/news/looking-back-on-2023/) (670 + 40 + 110 = 820 stations)
2. Published totals for yearly bikes in use are available below
   - [2018](https://bikesharetoronto.com/news/2018-highlights/) (3,750 bikes)
   - [2019](https://bikesharetoronto.com/news/2019-milestones/) (5,000 bikes)
   - [2020](https://bikesharetoronto.com/news/expansion-2020/) (6,850 bikes) (second paragraph of article *... system will grow to a total of 6,850 bikes, ...*)
   - [2021](https://www.toronto.ca/wp-content/uploads/2023/06/97f5-2022-Cycling-Year-in-Review-Final.pdf#page=3) (6,850 bikes, page 3 indicates no expansion so same number as in 2020)
   - [2022](https://www.toronto.ca/legdocs/mmis/2022/pa/bgrd/backgroundfile-229580.pdf#page=12) (7,165 bikes) (page 7 first paragraph of **Conclusion**)
   - [2023](https://www.toronto.ca/legdocs/mmis/2022/pa/bgrd/backgroundfile-229580.pdf#page=12) (7,165 + 945* = 8,100 bikes) (page 7 first paragraph of **Conclusion**, *assumes an average of 945 bikes is added per year in 2023, 2024 and 2025)

Aggregated yearly stations are shown below

In [24]:
%%time
cols_infra = ['started_at', 'start_station_id', 'end_station_id']
cols_infra_year = ['started_at_year']
df_raw_infra = (
    pd.read_parquet(fpaths_processed_raw, columns=cols_infra)
    .assign(started_at_year=lambda df: df['started_at'].dt.year)
    .drop(columns=['started_at'])
    .convert_dtypes()
)
df_proc_infra = (
    pd.read_parquet(fpaths_processed, columns=cols_infra_year+cols_infra[1:])
    .convert_dtypes()
)
query = """
        WITH t1 AS (
            -- get raw yearly stations used in departures and arrivals
            SELECT started_at_year AS year,
                   COUNT(DISTINCT(start_station_id)) AS stations_deps,
                   COUNT(DISTINCT(end_station_id)) AS stations_arrs
            FROM df_raw_infra
            -- WHERE started_at_year <> 2023
            GROUP BY started_at_year
        ),
        t2 AS (
            -- get processed yearly stations used in departures and arrivals
            SELECT started_at_year AS year,
                   COUNT(DISTINCT(start_station_id)) AS stations_deps,
                   COUNT(DISTINCT(end_station_id)) AS stations_arrs
            FROM df_proc_infra
            -- WHERE started_at_year <> 2023
            GROUP BY started_at_year
        ),
        -- get percentage difference between raw and published yearly totals
        t3 AS (
            SELECT year,
                   stations AS reported_stations,
                   GREATEST(stations_deps, stations_arrs) AS raw_stations,
                   100*(raw_stations-reported_stations)/reported_stations AS pct_diff_raw
            FROM t1
            INNER JOIN infra_reported USING (year)
        ),
        -- get percentage difference between processed and published yearly totals
        t4 AS (
            SELECT year,
                   stations AS reported_stations,
                   GREATEST(stations_deps, stations_arrs) AS proc_stations,
                   100*(proc_stations-reported_stations)/reported_stations AS pct_diff_proc
            FROM t2
            INNER JOIN infra_reported USING (year)
        )
        SELECT *
        FROM t3
        LEFT JOIN (SELECT * FROM t4) USING (year, reported_stations)
        ORDER BY year
        """
df_query = run_sql_query(query).convert_dtypes()
pu.show_nans_dtypes(df_query, show_transpose=True)
pu.highlight_multiple_columns_v2(
    df_query, ['year', 'pct_diff_raw', 'pct_diff_proc']
)

Unnamed: 0,year,reported_stations,raw_stations,pct_diff_raw,proc_stations,pct_diff_proc
missing,0,0,0,0,0,0
dtype,Int32,Int64,Int64,Float64,Int64,Float64


Unnamed: 0,year,reported_stations,raw_stations,pct_diff_raw,proc_stations,pct_diff_proc
0,2018,360,359,-0.277778,359,-0.277778
1,2019,465,469,0.860215,469,0.860215
2,2020,625,612,-2.08,611,-2.24
3,2021,670,629,-6.119403,628,-6.268657
4,2022,710,684,-3.661972,684,-3.661972
5,2023,820,815,-0.609756,813,-0.853659


CPU times: user 12.7 s, sys: 1.65 s, total: 14.3 s
Wall time: 9.31 s


**Observations**

1. The **processed** and raw totals for the yearly number of stations in use across the network are in good agreement (<4%) with the published totals.

## Discussion

### Conclusions

1. This notebook has processed the raw bike share ridership data in peparation for exploratory data analysis.
2. The following sanity checks were performed on the processed ridership data using totals in published reports about the performance of the bike share network
   - verified yearly totals for bike share ridership
   - verified that approximately 21,000 trips were taken on each Wednesday of August 2019
   - verified monthly cumulative ridership totals starting in April 2022 and ending in December 2022
   - verified that over 27,000 trips were taken on June 5, 2021
   - verified that 65% year-over-year growth in weekend trips was observed in 2020 relative to 2019
   - verified the number of active yearly stations

### Limitations

The data processing performed in this notebook is likely different from the approach used to process data that was used in the published reports on the use and size of bike share in Toronto. This should be the main reason for the discrepancy between totals found here compared to those reported in the published reports.

## Summary of Assumptions

1. For the current business use-case, the current date is assumed to be in the month of April in 2023. The stations metadata was retrieved during February of 2024. So, it will be assumed that the metadata from February 2024 matches the station metadata from April 2023.

## Next Step

The next notebook will perform exploratory data analysis to extract insights required by the business use-case for this project.

## Version Information

In [26]:
packages = [
    'requests',
    'contexttimer',
    'tqdm',
    'numpy',
    'pandas',
    'pyarrow',
    'duckdb',
]
print(
    watermark(
        updated=True,
        current_date=True,
        current_time=True,
        timezone=True,
        custom_time="%Y-%m-%d %H:%M:%S %Z",
        python=True,
        machine=True,
        packages=','.join(packages),
    )
)

Last updated: 2024-03-01 06:33:09 UTC

Python implementation: CPython
Python version       : 3.11.8
IPython version      : 8.22.1

requests    : 2.31.0
contexttimer: 0.3.3
tqdm        : 4.66.2
numpy       : 1.26.4
pandas      : 2.2.1
pyarrow     : 15.0.0
duckdb      : 0.10.0

Compiler    : GCC 12.3.0
OS          : Linux
Release     : 6.6.10-76060610-generic
Machine     : x86_64
Processor   : x86_64
CPU cores   : 12
Architecture: 64bit

