In [6]:
from datetime import datetime
import decimal
import json
import requests
from zipfile import ZipFile

import boto3
import numpy as np
import psycopg2
import pandas as pd
from sklearn.neighbors import BallTree

import config as cfg

In [7]:
with open('kcm_routes_exploded_modified.geojson', 'r') as f:
    kcm_routes = json.load(f)

# Add the local express to the route id to create unique key for each segment (dynamodb only can have a 2-composite key)
for feature in kcm_routes['features']:
    feature['route_id'] = feature['properties']['ROUTE_ID']
    feature['segment_id'] = int(feature['properties']['SEG_ID'])
    if feature['properties']['LOCAL_EXPR'] == 'L':
        feature['route_id'] = int(str(feature['route_id']) + str(0))
    else:
        feature['route_id'] = int(str(feature['route_id']) + str(1))

In [8]:
# Get time
end_time = round(datetime.now().timestamp())
start_time = end_time - (24*60*60)

# Query the last 24 hours of data from the RDS database
# conn = psycopg2.connect(
#     host=cfg.HOST,
#     database=cfg.DATABASE,
#     user=cfg.UID,
#     password=cfg.PWD)

# data = query_table(conn, ...)
#query_text = paste0('SELECT * 
#                  FROM active_trips_study 
#                  WHERE collectedtime 
#                  BETWEEN ', start_time, ' AND ', end_time,';')
daily_results = pd.read_csv("10_25_2020.csv")

In [9]:
# Get the latest GTFS route - trip data from the KCM FTP server
url = 'http://metro.kingcounty.gov/GTFS/google_transit.zip'
r = requests.get(url, allow_redirects=True)
open('google_transit.zip', 'wb').write(r.content)
with ZipFile('google_transit.zip', 'r') as zipObj:
    zipObj.extractall('google_transit')
gtfs_trips = pd.read_csv('google_transit/trips.txt')
gtfs_trips = gtfs_trips[['route_id', 'trip_id', 'trip_short_name']]

In [10]:
# Remove duplicate trip locations
daily_results.drop_duplicates(subset=['tripid','locationtime'], inplace=True)
daily_results.sort_values(by=['tripid','locationtime'], inplace=True)

# Offset tripdistance, locationtime, and tripids by 1
daily_results['prev_tripdistance'] = 1
daily_results['prev_locationtime'] = 1
daily_results['prev_tripid'] = 1
daily_results['prev_tripdistance'] = daily_results['tripdistance'].shift(1)
daily_results['prev_locationtime'] = daily_results['locationtime'].shift(1)
daily_results['prev_tripid'] = daily_results['tripid'].shift(1)

# Remove NA rows, and rows where tripid is different (last recorded location)
daily_results.dropna(inplace=True)
daily_results = daily_results[daily_results['tripid'] == daily_results['prev_tripid']]

# Calculate average speed between each location bus is tracked at
daily_results.loc[:,'dist_diff'] = daily_results['tripdistance'] - daily_results['prev_tripdistance']
daily_results.loc[:,'time_diff'] = daily_results['locationtime'] - daily_results['prev_locationtime']
daily_results.loc[:,'avg_speed_m_s'] = daily_results['dist_diff'] / daily_results['time_diff']

# Remove rows where speed is below 0 and round to one decimal place
daily_results = daily_results[daily_results['avg_speed_m_s'] >= 0]
daily_results.loc[:,'avg_speed_m_s'] = round(daily_results.loc[:,'avg_speed_m_s'])

# Merge with GTFS data to get route_ids for each track
daily_results = daily_results.merge(gtfs_trips, left_on='tripid', right_on='trip_id')

In [11]:
# Concat 1 or 0 to the route id to make it fit with dynamoDB key schema
def add_one(element):
    return int(str(element) + '1')
def add_zero(element):
    return int(str(element) + '0')
daily_results.loc[daily_results['trip_short_name']=='LOCAL', 'route_id'] = daily_results['route_id'].apply(add_zero)
daily_results.loc[daily_results['trip_short_name']=='EXPRESS', 'route_id'] = daily_results['route_id'].apply(add_one)

In [12]:
# Convert segment data from json format to tabular, so that it can be used with nn function
feature_coords = []
route_ids = []
seg_ids = []
for feature in kcm_routes['features']:
    for coord_pair in feature['geometry']['coordinates']:
        feature_coords.append(coord_pair)
        route_ids.append(feature['route_id'])
        seg_ids.append(feature['segment_id'])
segments = pd.DataFrame()
segments['route_id'] = route_ids
segments['segment_id'] = seg_ids
segments['lat'] = np.array(feature_coords)[:,0]
segments['lon'] = np.array(feature_coords)[:,1]

In [13]:
def get_nearest(src_points, candidates, k_neighbors=1):
    """Find nearest neighbors for all source points from a set of candidate points"""

    # Create tree from the candidate points
    tree = BallTree(candidates, leaf_size=15, metric='haversine')

    # Find closest points and distances
    distances, indices = tree.query(src_points, k=k_neighbors)

    # Transpose to get distances and indices into arrays
    distances = distances.transpose()
    indices = indices.transpose()

    # Get closest indices and distances (i.e. array at index 0)
    # note: for the second closest points, you would take index 1, etc.
    closest_idx = indices[0]
    closest_dist = distances[0]

    # Return indices and distances
    return (closest_idx, closest_dist)

In [14]:
final_return = pd.DataFrame()

route_list = pd.unique(daily_results['route_id'])
for route in route_list:
    route_results = daily_results[daily_results['route_id']==route]
    route_segments = segments[segments['route_id']==route].reset_index()

    # Use closest index returned by get_nearest to join the closest segment info back to each datapoint
    if len(route_results) > 0 and len(route_segments) > 0:
        result_idxs, result_dists = get_nearest(route_results[['lat', 'lon']], route_segments[['lat', 'lon']])
        route_results = route_results.reset_index().join(route_segments.loc[result_idxs,:].reset_index(), rsuffix='_seg')
        # Add all of the joined data, and segment data back to a dataframe to use for uploading
        final_return = final_return.append(route_results)
    else:
        # A route was tracked by OneBusAway that does not have an id in the King County Metro shapefile
        result_idxs = -1
        result_dists = -1

In [15]:
to_upload = final_return[['route_id', 'segment_id', 'avg_speed_m_s']]
to_upload = to_upload.groupby(['route_id', 'segment_id']).mean().reset_index()
to_upload['avg_speed_m_s'] = round(to_upload['avg_speed_m_s'], 1)
to_upload['avg_speed_m_s'] = to_upload['avg_speed_m_s'].apply(str)
to_upload = to_upload.to_dict(orient='records')

In [19]:
# Set up the connection to the Dynamodb database
dynamodb = boto3.resource('dynamodb',
                         region_name=cfg.REGION,
                         aws_access_key_id = cfg.ACCESS_ID,
                         aws_secret_access_key = cfg.ACCESS_KEY
                         )
table = dynamodb.Table('KCM_Bus_Routes_Modified')

In [22]:
# Update each route/segment combination with its avg speed for the day
for track in to_upload:
    response = table.update_item(
        Key={
            'route_id': track['route_id'],
            'segment_id': track['segment_id']
        },
        UpdateExpression="SET avg_speed = :speed, historic_speeds = list_append(historic_speeds, :vals)",
        ExpressionAttributeValues={
            ':speed': track['avg_speed_m_s'],
            ':vals': [track['avg_speed_m_s']]
        }
    )