In [1]:
import pandas as pd
import numpy as np
import geopandas as gpd
import sqlalchemy
import folium
from folium import plugins
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

import datetime as dt

In [2]:
import folium
from folium.plugins import MarkerCluster
import pysal as ps
from pysal.viz import mapclassify

import ipywidgets as widgets

In [3]:
import partridge as ptg
idx = pd.IndexSlice

In [4]:
def positions_from_db(db_name, limit, offset=0, routes=['all']):
    cols = 'oid, trip_id, route_id, trip_start_time, trip_start_date, vehicle_id, occupancy_status, position_latitude, position_longitude, timestamp'
    aws_host='gtfs-rt-logging.clakglowlpps.us-west-2.rds.amazonaws.com'
    eng = sqlalchemy.create_engine(
        f'mysql://halfempty:tentoninety@{aws_host}:3306/{db_name}')
    
    where = ''
    sep = "'"
    
    if offset == 0:
        offset = ''
    else:
        offset = f'offset {offset}'
    if routes[0] != 'all':
        where = f"where route_id in {str(tuple(routes))} and weekday(timestamp) in (0,1,2,3,4)"
    else:
        where = 'where weekday(timestamp) in (0,1,2,3,4)'

    query = f'select {cols} from vehicle_positions {where} order by timestamp desc limit {limit} {offset}'
    print(query)
    df = pd.read_sql(query, con=eng)
    count = eng.execute('select count(*) from vehicle_positions').fetchall()[0][0]
    return df, count


In [5]:
def gdf_from_positions_df(df):
    
    df = df.dropna(subset=['position_latitude', 'position_longitude'])
    geo = gpd.points_from_xy(df['position_longitude'], df['position_latitude'])
    gdf = gpd.GeoDataFrame(df, geometry=geo, crs='EPSG:4326')
    
    return gdf

In [6]:
def feed_from_path(path):
    '''Using Partridge, read a (GeoPandas enabled) GTFS feed given a filepath'''
    _date, service_ids = ptg.read_busiest_date(path)
    
    view = {
        'trips.txt': {'service_id': service_ids},
    }
    return ptg.load_geo_feed(path, view)

In [7]:
mbta_gtfs = feed_from_path('./data/mbta/08282020_gtfs.zip')

In [8]:
def aggregate_by_stop(df, gtfs):
    
    routes = df['route_id'].unique()
    
    trips_crowding_rts = gtfs.trips[gtfs.trips['route_id'].isin(routes)]['trip_id']
    stops_crowding_routes = gtfs.stop_times[gtfs.stop_times['trip_id'].isin(trips_crowding_rts)].drop_duplicates(subset=['stop_id'])['stop_id']
    stops_with_crowding = gtfs.stops[gtfs.stops['stop_id'].isin(stops_crowding_routes)]
    stops_with_crowding = stops_with_crowding[['stop_id', 'stop_name', 'geometry']]

    #https://spatialreference.org/ref/epsg/3586/
    stops_projected = stops_with_crowding.to_crs('EPSG:3586')

    #250ft buffer
    stops_projected['geometry'] = stops_projected['geometry'].buffer(250)
#     return stops_projected

    df = df.to_crs('EPSG:3586')
    ##filter df smaller...
    try:
        df.drop(columns=['position_latitude',
                          'position_longitude'], inplace=True)
    except KeyError:
        pass

    join1 = gpd.sjoin(stops_projected, df, how='left', op='contains')
#     return join1
    join1['hour'] = join1['timestamp'].dt.hour
    grouped = join1.groupby(['stop_id', 'route_id', 'hour'])
    series = grouped['occupancy_status'].value_counts().transpose()
    testdf = (pd.DataFrame(series)
              .rename(columns={'occupancy_status':'reports'})
              .unstack(level=[3]))

    ##This is it!
    testdf = testdf.droplevel(0, axis=1)
    geo_df = stops_projected.set_index('stop_id')
    
    return testdf, geo_df

In [9]:
def agg_positions_with_crowding(db_name, gtfs):
    
    #fetch a subset and sense which routes provide crowding data
    subset_allrt, row_count = positions_from_db(db_name, 100000)
#     print(row_count)
#     return
    #routes where crowding info isn't all the same, i.e. actual data
    values_by_rt = subset_allrt.groupby('route_id')['occupancy_status'].nunique().eq(1)
    routes_with_crowding = values_by_rt[values_by_rt == False].index
    
    ##TODO too small for MBTA but need memory for gtfs stops, implement batching every 10**6 on final.
    i = 0
    while i < row_count:
        gdf_crowding, _count = positions_from_db(db_name, 5*10**5, 
                                         offset=i, routes=routes_with_crowding)
        if gdf_crowding.shape[0] == 0:
            break
        gdf_crowding = gdf_from_positions_df(gdf_crowding)
        aggregated_df, geo_df = aggregate_by_stop(gdf_crowding, gtfs)
        if i == 0:
            global db1
            db1 = aggregated_df
            aggregated_dfs = aggregated_df
        else:
            global db2
            db2 = aggregated_df
            aggregated_dfs = aggregated_dfs.add(aggregated_df, fill_value=0)
        i += 5*10**5
    return aggregated_dfs, geo_df


In [10]:
# mbta, mbta_geo = agg_positions_with_crowding('mbta', mbta_gtfs)

In [10]:
def in_stop_list(x):
    try:
        test_list = list(stops_per_rt.loc[x['route_id'].iloc[0]])
    except:
        print(f"no stop data for route {x['route_id'].iloc[0]}!")
        return
    return x[x['stop_id'].isin(test_list)]

In [11]:
def filter_actual_stops(df, gtfs):
    
    def in_stop_list(x):
        try:
            test_list = list(stops_per_rt.loc[x['route_id'].iloc[0]])
        except:
            print(f"no stop data for route {x['route_id'].iloc[0]}!")
            return
        return x[x['stop_id'].isin(test_list)]

    trip_indexed = gtfs.trips.set_index('trip_id')[['route_id', 'direction_id']]
    stops_per_rt = gtfs.stop_times.set_index('trip_id').join(trip_indexed).groupby('route_id')['stop_id'].unique()
#     return stops_per_rt
    reset = df.reset_index()
    rt_group = reset.groupby('route_id')
    df = rt_group.apply(in_stop_list)
#     return df
    df = df.set_index('stop_id', append=True).set_index('hour', append=True).droplevel(1)
    
    return df

In [67]:
mbta.reset_index()

occupancy_status,stop_id,route_id,hour,EMPTY,FEW_SEATS_AVAILABLE,FULL,MANY_SEATS_AVAILABLE
0,1,1,0.0,20.0,6.0,,42.0
1,1,1,1.0,21.0,2.0,,34.0
2,1,1,2.0,19.0,,,33.0
3,1,1,3.0,9.0,,,33.0
4,1,1,4.0,11.0,2.0,,26.0
...,...,...,...,...,...,...,...
282048,99991,Shuttle-Generic,12.0,1.0,,,
282049,99991,Shuttle-Generic,13.0,1.0,,,
282050,99991,Shuttle-Generic,16.0,2.0,,,
282051,99991,Shuttle-Generic,18.0,1.0,,,


In [68]:
mbta_filtered = filter_actual_stops(mbta, mbta_gtfs)

no stop data for route 62!
no stop data for route Shuttle-Generic!
no stop data for route Shuttle-Generic-Red!


In [69]:
mbta_filtered.loc[idx['1',:,:],:].index.get_level_values(1).unique()

Index(['1', '10003', '101', '10100', '10101', '102', '104', '10590', '106',
       '107', '108', '109', '110', '187', '188', '2', '57', '58', '59', '6',
       '62', '63', '64', '66', '67', '68', '69', '71', '72', '73', '74', '75',
       '77', '79', '80', '82', '83', '84', '854', '856', '87', '88', '89',
       '91', '93', '95', '97', '99'],
      dtype='object', name='stop_id')

In [70]:
mbta_filtered.to_parquet('./data/mbta/mbta_filtered.parquet')

In [71]:
##approx Aug 17 to Sep 4... 
##TODO implement timestamping in df fetch, or from S3 files...

In [12]:
mbta_filtered = pd.read_parquet('./data/mbta/mbta_filtered.parquet')

In [13]:
mbta_filtered

Unnamed: 0_level_0,Unnamed: 1_level_0,occupancy_status,route_id,EMPTY,FEW_SEATS_AVAILABLE,FULL,MANY_SEATS_AVAILABLE
route_id,stop_id,hour,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1,1,0.0,1,20.0,6.0,,42.0
1,1,1.0,1,21.0,2.0,,34.0
1,1,2.0,1,19.0,,,33.0
1,1,3.0,1,9.0,,,33.0
1,1,4.0,1,11.0,2.0,,26.0
...,...,...,...,...,...,...,...
94,63241,19.0,94,24.0,,,
94,63241,20.0,94,11.0,,,
94,63241,21.0,94,9.0,,,1.0
94,63241,22.0,94,11.0,,,


In [75]:
mbta_geo.to_file('./data/mbta/mbta_geo.geojson', driver="GeoJSON")

In [14]:
mbta_geo = gpd.read_file('./data/mbta/mbta_geo.geojson').set_index('stop_id', drop=True)

### next: 
   * view logic, calculate metric
   * visualizer
