In [1]:
from datetime import datetime as dt
import geopandas as gpd
import pandas as pd
from tqdm import tqdm
import numpy as np

from data.constants import (L_RIDERSHIP_TABLE, BUS_RIDERSHIP_TABLE)
from data.cta import CTAClient
from data.divvy import DivvyClient
from data.uber import UberClient
from data.datemath import iso_to_ymd, is_iso, to_ymd, is_ymd

In [2]:
train_rides_out = "../data/raw/train_rides.csv"
bus_rides_out = "../data/raw/bus_rides.csv"
bike_rides_out = "../data/raw/bike_rides.geoparquet"
uber_tract_rides_out = "../data/raw/uber_tract_rides.parquet"
uber_comm_rides_out = "../data/raw/uber_comm_rides.parquet"

In [3]:
cta_client = CTAClient(60)
divvy_client = DivvyClient()
uber_client = UberClient(60*15)

# Pipeline In

(None)

# Define Data Scope

Some of these tables are rather large so we need to make good choices about
what to pull in. We should abstract any logic that we might need to re-do
if we want to pull in additional dates, and cache anything that takes a while to load.

Looking ahead, we use models with -1 week, -1 month, and -1 YTD, at daily granularity.

Therefore we will pull in data from JANUARY 1, 2024 through AUGUST 31, 2024.

Note: we want to finish the whole month of August to ensure we have a FULL 
week of data for the DNC. Otherwise we may mis-infer a weekly/monthly effect of DNC on ridership
when actually we just mechanically omitted some days!

In [4]:
data_start_iso = dt(2024, 1, 1).isoformat()
data_end_iso = dt(2024, 8, 31, 23, 59, 59).isoformat()

# Train Rides

In [5]:
train_rides = cta_client.soda_get_all(L_RIDERSHIP_TABLE, 
                            select="station_id,date,daytype,rides",
                            where=f"date between '{data_start_iso}' and '{data_end_iso}'")

# Bus Rides

In [6]:
bus_rides = cta_client.soda_get_all(BUS_RIDERSHIP_TABLE, 
                            select="route,date,daytype,rides",
                            where=f"date between '{data_start_iso}' and '{data_end_iso}'")

# Bike Rides

The divvy ridership are at the ride granularity, so we need to aggregate to station-level.

In [7]:
def agg_ridership(trips: pd.DataFrame):
    """
    Get counts by station and date.
    """
    trips['start_date'] = trips['start_time'].dt.date.apply(to_ymd)
    trips['end_date'] = trips['end_time'].dt.date.apply(to_ymd)
    id_cols = ['station_id','station_name','date','vintage'] 
    id_cols += ['geometry'] if any('geometry' in x for x in trips.columns) else []
    start_rides = trips.rename(columns=lambda x: x.replace('start_','')) \
                    .groupby(id_cols, as_index=False).size() \
                    .rename(columns={'size': 'start_rides'})
    end_rides = trips.rename(columns=lambda x: x.replace('end_','')) \
                    .groupby(id_cols, as_index=False).size() \
                    .rename(columns={'size': 'end_rides'})
    rides = start_rides.merge(end_rides, how='outer')
    rides['rides'] = rides['start_rides'].fillna(0) + rides['end_rides'].fillna(0)
    return rides

In [8]:
# Takes ~3m to run
bike_rides = divvy_client.s3_bike_trips(dt.fromisoformat(data_start_iso).year, 
                                        dt.fromisoformat(data_end_iso).year)
bike_rides = map(agg_ridership, bike_rides)
bike_rides = pd.concat(list(tqdm(bike_rides)), ignore_index=True)
bike_rides = bike_rides.loc[(bike_rides['date'] >= iso_to_ymd(data_start_iso)) & \
                            (bike_rides['date'] <= iso_to_ymd(data_end_iso))]
bike_rides = gpd.GeoDataFrame(bike_rides, geometry='geometry')

0it [00:00, ?it/s]

DEBUG: populating bucket paths.


12it [02:41, 13.49s/it]


In [9]:
# Note, according to https://data.cityofchicago.org/Transportation/Divvy-Bicycle-Stations/bbyy-e7gq/about_data
# each station contains multiple bike docks. Nevertheless, the station location
# has crap accuracy and precision. Nominally there are 900k unique station points.
# From the normalized stations, we expect ~3.7k or fewer stations.
# We'll address this in a subsequent notebook.

# Uber Rides

**Query Strategy**
Because the Socrata database is huge and consistently times out (>20m responses),
we require a very careful query strategy. We first break all queries into monthly
(falling back to daily) sections and then concatenate them. We also break
SELECT statements into multiple queries, to reduce the cardinality
of GROUP BY, ie separately querying pickups and dropoffs, and separately tracts 
vs community areas.

**Pickups vs Dropoffs**
Uber data, like bike data, has ingress and egress locations. Bus and train data
only contains ingress. Instead of throwing out egress data, we make the substantive
choice to model RIDES per unit, regardless of ingress/egress status. 

*Scale*:
This essentially
doubles the overall uber and bike ridership compared to bus and train, which
is not an issue for regression because the difference in scale will be accounted
for by transit-type fixed effects. 

*Censorship*:
Observing egress conditional on transit type is a form of sample censorship,
where Cov(Y = observed | X) > 0. TODO: I need to review my notes about data
censorship. AFAIK, it doesn't introduce bias if it's only conditional on X.

*Double counting*:
Related to the scale issue, this permits double-counting rides that start and
end in the same place. I'd argue these double-counts are consistent across
transit types. I'll assume the vast majority of train and bus riders do not
exit at the same station they board (exceptions include buskers and commuters
that quickly double-back.) OTOH, a circuit on a bike makes a lot of sense
in a sight-seeing or errands context. And a short uber drive across the neighborhood
also sounds plausible.

In [10]:
from time import sleep

from requests.exceptions import ReadTimeout
from urllib3.exceptions import TimeoutError
done = False
while not done:
    try:
        uber_comm_pickups = uber_client.soda_get_uber(select="""
                                    date_trunc_ymd(trip_start_timestamp) as start_date, 
                                    pickup_community_area,
                                    count(trip_id) as rides
                                    """,
                                    where_start=iso_to_ymd(data_start_iso), 
                                    where_end=iso_to_ymd(data_end_iso), 
                                    group="start_date, pickup_community_area",
                                    pickup=True,
                                    tract=False)
        uber_comm_dropoffs = uber_client.soda_get_uber(select="""
                                    date_trunc_ymd(trip_end_timestamp) as end_date, 
                                    dropoff_community_area,
                                    count(trip_id) as rides
                                    """,
                                    where_start=iso_to_ymd(data_start_iso), 
                                    where_end=iso_to_ymd(data_end_iso), 
                                    group="end_date, dropoff_community_area",
                                    pickup=False,
                                    tract=False)
        uber_tract_pickups = uber_client.soda_get_uber(select="""
                                    date_trunc_ymd(trip_start_timestamp) as start_date, 
                                    pickup_census_tract,
                                    count(trip_id) as rides
                                    """,
                                    where_start=iso_to_ymd(data_start_iso), 
                                    where_end=iso_to_ymd(data_end_iso), 
                                    group="start_date, pickup_census_tract",
                                    pickup=True,
                                    tract=True)
        uber_tract_dropoffs = uber_client.soda_get_uber(select="""
                                    date_trunc_ymd(trip_end_timestamp) as end_date, 
                                    dropoff_census_tract,
                                    count(trip_id) as rides
                                    """,
                                    where_start=iso_to_ymd(data_start_iso), 
                                    where_end=iso_to_ymd(data_end_iso), 
                                    group="end_date, dropoff_census_tract",
                                    pickup=False,
                                    tract=True)
        done = True
    except (ReadTimeout, TimeoutError) as err:
        print("Read timeout. Retrying in 60s")
        sleep(60)

In [11]:
# Fix mismatched date formats
assert uber_comm_dropoffs.end_date.apply(is_iso).all()
assert uber_comm_pickups.start_date.apply(is_iso).all()
assert uber_tract_dropoffs.end_date.apply(is_iso).all()

uber_comm_dropoffs['end_date'] = uber_comm_dropoffs['end_date'].apply(iso_to_ymd)
uber_comm_pickups['start_date'] = uber_comm_pickups['start_date'].apply(iso_to_ymd)
uber_tract_dropoffs['end_date'] = uber_tract_dropoffs['end_date'].apply(iso_to_ymd)

# Selectively transform inconsistent formats in uber_tract_pickups:
uber_tract_pickups['start_date'] = np.where(
    uber_tract_pickups['start_date'].apply(is_ymd),
    uber_tract_pickups['start_date'],
    uber_tract_pickups['start_date'].apply(iso_to_ymd))

In [12]:
# Prep for combining pickups and dropoffs.
# Note: combined frame will be grouped by DATE and GEO
# and will have three rides columns: start, end, total
# I choose to preserve start/end to permit a robustness check 
# that only models start rides to have exact parity with train and bus.
uber_tract_pickups = uber_tract_pickups.rename(columns={'start_date':'date', 
                                            'pickup_census_tract':'tract', 
                                            'rides':'start_rides'})
uber_tract_dropoffs = uber_tract_dropoffs.rename(columns={'end_date':'date', 
                                              'dropoff_census_tract':'tract', 
                                              'rides':'end_rides'})
uber_comm_pickups = uber_comm_pickups.rename(columns={'start_date':'date', 
                                            'pickup_community_area':'comm', 
                                            'rides':'start_rides'})
uber_comm_dropoffs = uber_comm_dropoffs.rename(columns={'end_date':'date', 
                                              'dropoff_community_area':'comm',
                                              'rides':'end_rides'})

In [13]:
# We'll keep tracts and comms as separate dfs since they are separate units of agg.
# Just like the stations vs lines df.
uber_tract_rides = uber_tract_pickups.merge(uber_tract_dropoffs, how='outer')
uber_comm_rides = uber_comm_pickups.merge(uber_comm_dropoffs, how='outer')
uber_tract_rides['rides'] = uber_tract_rides['start_rides'].fillna(0) + uber_tract_rides['end_rides'].fillna(0)
uber_comm_rides['rides'] = uber_comm_rides['start_rides'].fillna(0) + uber_comm_rides['end_rides'].fillna(0)

# Pipeline out

In [14]:
train_rides.to_csv(train_rides_out, index=False)
bus_rides.to_csv(bus_rides_out, index=False)
bike_rides.to_parquet(bike_rides_out, index=False)
uber_tract_rides.to_parquet(uber_tract_rides_out, index=False)
uber_comm_rides.to_parquet(uber_comm_rides_out, index=False)