In [1]:
from IPython.display import HTML
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

import arrow
import requests
import json
import re
import glob
import os
import ConfigParser

# Table of contents
* [Introduction](#introduction)
* [To Do](#todo)
* [Loading Raw Vehicle Position Data](#loading)
* [Downloading and Parsing Patterns](#patterns)
* [Cleaning the raw data](#cleaning)
* [Building timetables](#timetable)
* [Derived Data](#derived)
    * [Trip and Wait Times](#trips_waits)

## Introduction <a name="introduction"></a>
This notebook contains the scripts used to clean and transform bus position data collected from the **`getvehicles`** API. The data processing is conducted as follows:
1. Load the raw vehicle position data into a pandas DataFrame. Filter out any undesired bus routes.
2. Download bus route patterns from the **`getpatterns`** API.
3. Load route patterns into a pandas DataFrame.
4. Clean the raw vehicle position data, including removing duplicate rows, and assigning each bus trip a unique ID.
5. For each bus route, transform the raw vehicle data into a timetable DataFrame, with bus stops as the columns and individual trips as rows.
6. Using each bus route's timetable DataFrame, calculate the travel times between each bus stop and the wait times between adjacent buses at each stop, and store in a new DataFrame.

## To Do <a name="todo"></a>
* Breakup Jupyter notebook into standalone Python scripts for data processing
* Flag bad vehicle position data? e.g. Check monotonicity, count number of data points.
* Write functions to keep track of access date when downloading patterns from the **`getpatterns`** API, and create new file if pattern has changed
* Load raw pattern data in a more elegant and simpler way. Current loading process is a holdover from a different way of processing patterns data.
* Edit code for consistent style.
* Completed detailed function descriptions and useage notes.

## Loading Raw Vehicle Position Data <a name="loading"></a>

`load_raw_data`  
Loads raw vehicle position data from possibly multiple file sources into one DataFrame. 

In [2]:
def load_raw_data(path, file_stem):
    names = ['tripid', 'tmstmp', 'pid', 'rt', 'pdist', 'dly']
    
    all_files = glob.glob(os.path.join(path, "*{}.csv".format(file_stem)))
    df_each = (pd.read_csv(f, skiprows=1, names=names, dtype=str) for f in all_files)
    df = pd.concat(df_each, ignore_index=True)
    
    df.dropna(how='any', inplace=True)
    df.tmstmp =  pd.to_datetime(df.tmstmp)
    df.pdist = df.pdist.astype(int)
    
    return df

## Downloading and Parsing Patterns <a name="patterns"></a>

`check_if_path_exists`  
Checks if the given directories exists. If they do not exist, it creates them.

`get_patterns`  
Queries the **`getpatterns`** API to obtain information patterns present in given DataFrame. Each pattern is written to a separate .json file of the form *RouteNumber_PatternID.json*.

Note: It is not possible to use the **`getpatterns`** API to access all patterns associated with a particular bus route. One either has the option to request all *active* patterns for a particular bus route or to request patterns individually by their pattern ID. This script does the latter.

`untangle_pattern_columns` and `pattern_dict_to_df` are supporting functions for `load_patterns`.

`load_patterns`
Loads the patterns for a particular bus route into a DataFrame.

In [3]:
config = ConfigParser.ConfigParser()
config.read("../../keys.config")
API_KEY = config.get("ctabustracker", "api_key")
URL = "http://www.ctabustracker.com/bustime/api/v2/getpatterns"
patterns_path = "../../data/raw/getpatterns/"

def check_if_path_exists(path):
    try:
        os.makedirs(path)
    except OSError:
        if not os.path.isdir(path):
            raise

def get_patterns(df):
    check_if_path_exists(patterns_path)
    
    pids = [str(pid) for pid in df.pid.unique()]
    # the getpatterns API only accepts upto 10 patterns at a time
    pids_chunks = [pids[i:i+10] for i in xrange(0, len(pids), 10)]
    for chunk in pids_chunks:
        pids_str = ",".join(chunk)
        payload = {'key': API_KEY, 'pid': pids_str, 'format': "json"}      
        r = requests.get(URL, params=payload)
        patterns = r.json().get('bustime-response').get('ptr')
        
        if not patterns:
            print "A very bad error has occurred"
              
        for pattern in patterns:
            pid = str(pattern['pid'])
            rt = str(df[df.pid == pid].rt.unique()[0])
            with open(os.path.join(patterns_path, "{}_{}.json".format(rt, pid)), 'w') as out_file:
                json.dump(pattern, out_file)
                
def untangle_pattern_columns(df, pid):
    stops_df = pd.DataFrame(df.loc['pt', pid])
    pattern_info_df = pd.DataFrame(df.loc[~df.index.isin(['pt']), pid]).T.reset_index(drop=True)
    pattern_df = stops_df.join(pattern_info_df)
    ff_cols = ['first', 'last', 'ln', 'pid', 'rtdir']
    pattern_df[ff_cols] = pattern_df[ff_cols].ffill()
    pattern_df.pid = pattern_df.pid.astype(str)
    return pattern_df

def pattern_dict_to_df(json_dict):
    df = pd.DataFrame(json_dict)
    new_df = pd.concat([untangle_pattern_columns(df, pid) for pid in df.columns]).reset_index(drop=True)
    new_df.drop(new_df[new_df.typ == "W"].index, inplace=True)
    return new_df

def load_patterns(rt):
    pattern_dict = {}
    
    for file in glob.glob(os.path.join(patterns_path, "{}_*".format(rt, pid))):
        with open(file) as f:    
            pattern = json.load(f)
            
        stops = [stop for stop in pattern['pt'] if stop['typ'] != "W"]
        pattern['first'] = stops[0]['stpnm']
        pattern['last'] = stops[-1]['stpnm']
        pattern_dict[pattern['pid']] = pattern
    
    return pattern_dict_to_df(pattern_dict)

## Cleaning the raw data <a name="cleaning"></a>

`remove_unknown_patterns`  
Removes trips with pIDs for which there is no pattern data.

`create_unique_id`  
Tripids alone are not necessarily unique identifiers of a trip on a given route. This function creates a unique identifier for each trip by combining the start date of the trip, the pID of pattern it is executing, and its CTA-assigned tripid.

Note: For the purposes of the CTA's scheduling, the new service day starts around 3AM. For example, if trip initially departs at 2AM on 2017-01-02, then the trip was scheduled for the 2017-01-01 service day. **Similarly, in this project 3AM US/Central time is treated as the start of a new day.**

`remove_short_trips`  
Removes data where bus travels less than 5,000 ft.

`clean`  
Removes duplicate rows and executes the above three functions.

In [4]:
def remove_unknown_patterns(df, patterns):    
    not_include = (set(df.pid) - set(patterns.pid))
    df.drop(df[df.pid.isin(not_include)].index, inplace=True)
    print "Deleted pattern IDs {}".format(list(not_include))

def create_unique_ids(df):
    df["unix_tmstmp"] = df.tmstmp.apply(lambda x: arrow.get(x, 'US/Central').timestamp)
    df.sort_values(['tripid', 'tmstmp'], inplace=True)
    # If two data points with same tripID are more than 30 minutes a part, they probably belong to different trips
    # In practice, such data points will usually (but not always), be at least 24 hours apart.
    g = df.groupby(['tripid', (df.tmstmp.diff() > pd.Timedelta('30 minutes')).astype(int).cumsum()])
    idxmins = g.unix_tmstmp.idxmin()
    df_idxmins = df.loc[idxmins]
    df.loc[idxmins, "ID"] = (df_idxmins.tmstmp - pd.DateOffset(hours=3)).dt.strftime('%Y%m%d') + "_" + df_idxmins.pid + "_" + df_idxmins.tripid
    df.ID.ffill(inplace=True)

def remove_short_trips(df):
    df.drop(df.groupby('ID').filter(lambda x: x.pdist.max() - x.pdist.min() < 5000).index, inplace=True)
            
def clean(df, patterns):
    df.drop_duplicates(inplace=True)
    remove_unknown_patterns(df, patterns)
    create_unique_ids(df)
    remove_short_trips(df)

## Building timetables <a name="timetable"></a>

`interpolate_arrival_times`  
Interpolates the DateTimes that the given bus trip reaches all of the provided stops along its route. Input should be given as a DataFrame with rows corresponding only to a single trip. Returns a list of date strings or np.nan if the bus does not reach a particular stop. Interpolation is performed as follows: given the raw location data for a single trip and the location of a bus stop, first sort the data into two parts: all of the raw data points where the bus is before the bus stop and all of the raw data points where the bus is at or past the bus stop. Next, select the row from the before table with the latest timestamp (before_max) and select the row from the after table with the earliest timestamp (after_min). Find the equation of the line connecting these two points where the input is pdist (distance) and the output is timestamp (time) and evaluate the equation at the pdist of the bus stop.

In [5]:
def build_query_strings(stop, patterns):
    filtered = patterns[patterns.stpnm == stop][["pid", "pdist"]]
    #stop_pdist = shift_terminal_stop_pdist(patterns, pattern, stop)

    query_str_before = " | ".join(["(pid == '{}' & pdist < {})".format(pid, pdist) for pid, pdist in zip(filtered.pid, filtered.pdist)])
    query_str_after = " | ".join(["(pid == '{}' & pdist >= {})".format(pid, pdist) for pid, pdist in zip(filtered.pid, filtered.pdist)])
    return query_str_before, query_str_after

def find_linear_interpolant_endpoints(df, stop, patterns):
    query_str_before, query_str_after = build_query_strings(stop, patterns)

    idxmaxs = df.query(query_str_before).groupby('ID').unix_tmstmp.idxmax()
    idxmins = df.query(query_str_after).groupby('ID').unix_tmstmp.idxmin()

    before = df.loc[idxmaxs, ["pdist", "unix_tmstmp", "ID"]].set_index("ID")
    after = df.loc[idxmins, ["pdist", "unix_tmstmp", "ID"]].set_index("ID")

    return before, after

def evaluate_linear_interpolant(tableaux, stop):
    interpolated_arrivals = (
            ((tableaux.unix_tmstmp_after - tableaux.unix_tmstmp) / (tableaux.pdist_after - tableaux.pdist))
            * (tableaux.stop_pdist - tableaux.pdist)
            + tableaux.unix_tmstmp
        ) 
    mask = pd.notnull(interpolated_arrivals)
    interpolated_arrivals.loc[mask] = interpolated_arrivals[mask].map(lambda x: arrow.get(x).to('US/Central').format('YYYY-MM-DD HH:mm:ss'))
    return interpolated_arrivals

def interpolate_stop_arrival_times(df, stop, patterns):
    tableaux = pd.DataFrame(np.nan, index=df.ID.unique(), columns=[stop])
    tableaux = tableaux.join(df.groupby('ID').pid.first())

    before, after = find_linear_interpolant_endpoints(df, stop, patterns)
    tableaux = tableaux.join(before, rsuffix="_before").join(after, rsuffix="_after")
    
    pid_to_pdist = patterns[patterns.stpnm == stop].groupby('pid').pdist.first()
    mask = tableaux.pid.isin(pid_to_pdist.index)
    tableaux.loc[mask, "stop_pdist"] = tableaux[mask].pid.map(pid_to_pdist)

    interpolated_arrivals = evaluate_linear_interpolant(tableaux, stop)
    return interpolated_arrivals

`build_bidirectional_timetable`  
Builds a timetable DataFrame of interpolated arrival/departure times for each bus stop in either service direction on given route.

In [6]:
# Holiday schedules
# Our services operate on a Sunday schedule on New Year’s Day, Memorial Day,
# July 4th (Independence Day), Labor Day, Thanksgiving Day and Christmas Day.
holidays = [
    "2017-01-01", "2017-05-29", "2017-07-04", "2018-09-04", "2018-11-23", "2017-12-25",
    "2018-01-01", "2018-05-28", "2018-07-04", "2018-09-03", "2018-11-22", "2018-12-25",
    "2019-01-01", "2019-05-27", "2019-07-04", "2019-09-02", "2019-11-28", "2019-12-25"
]
cta_holidays = pd.DatetimeIndex(holidays)
timetables_path = "../../data/processed/timetables/"

def build_bidirectional_timetable(df, patterns):
    stop_list = patterns.stpnm.dropna().unique()
    timetable = pd.DataFrame(np.nan, index=df.ID.unique(), columns=stop_list)
    timetable.index.name = "ID"

    for stop in stop_list:
        timetable[stop] = interpolate_stop_arrival_times(df, stop, patterns)
    
    timetable.reset_index(inplace=True)
    timetable[["start_date", "pid", "tatripid"]] = timetable.ID.str.split("_", expand=True)
    timetable["rtdir"] = timetable.pid.map(patterns.groupby('pid').rtdir.first())
    timetable["start_date"] = pd.to_datetime(timetable.start_date)
    timetable["day_of_week"] = timetable.start_date.dt.dayofweek
    timetable["holiday"] = timetable.start_date.isin(cta_holidays)
    return timetable

def write_timetables(df):
    timetable = interpolate_arrival_times(df)
    check_if_path_exists(timetables_path)
    #split timetable into two pieces based on patterns
    #.to_csv(timetables_path + str(rt) + "_timetable_" + (direction[0].lower() + "b") + ".csv", index=False)
    
def load_timetable(filename):
    timetable = pd.read_csv(timetables_path + filename)
    timetable[stop_list] = timetable[stop_list].apply(pd.to_datetime)
    return timetable

`shift_terminal_stop_pdists`  
There is a significant amount of noise at the terminal stops of a route. This function shifts the location of the terminal stops inward by 500 feet. For example, if the first stop is located has a pdist of 0ft and the final stop has a pdist of 50,000ft, then `interpolate_arrival_times` treats these stops as being located at 500ft and 49,500ft, respectively.

In [7]:
def shift_terminal_stop_pdists(patterns):
    patterns.loc[np.intersect1d(patterns.groupby('pid').seq.idxmin(), patterns[patterns.pdist < 500].index), "pdist"] = 500

    idxs = np.intersect1d(patterns.groupby('pid').seq.idxmax(), patterns[patterns.pdist > patterns.ln - 500].index)
    pattern_lengths = patterns.loc[idxs].ln 
    patterns.loc[idxs, "pdist"] = pattern_lengths - 500
    
def process_raw_data(path, stem, rt):
    df = load_raw_data(path, stem)
    df.drop(df[df.rt != rt].index, inplace=True)
    patterns = load_patterns(rt)
    shift_terminal_stop_pdists(patterns)
    clean(df, patterns)
    build_timetable(df, patterns)

## Derived Data<a name="#derived"></a>
### Trip and Wait Times<a name="#trips_waits"></a>

In [8]:
def get_destination_stops(patterns, stop, direction):
    filtered = patterns[(patterns.stpnm == stop) & (patterns.rtdir == direction)]
    query_str = " | ".join(["(pid == '{}' & seq > {})".format(pid, seq) for pid, seq in zip(filtered.pid, filtered.seq)])
    return list(patterns.query(query_str).stpnm.unique())

def timedelta_to_decimal(td):
    return round(abs((td / np.timedelta64(1, 'D')) * 1440), 2)

def calculate_travel_times(df, origin, destinations):
    df[destinations] = df[destinations].sub(df[origin], axis=0)
    df[destinations] = df[destinations].apply(timedelta_to_decimal)
    return df

def calculate_wait_times(df, stop):
    df["wait_time"] = df[stop] - df[stop].shift(-1)
        
    # removes wait times calculated between service days
    df["day_diff"] = df.start_date - df.start_date.shift(-1)
    df.loc[df.day_diff.dt.days != 0, "wait_time"] = None
    
    df.drop(columns=[stop, "day_diff"], inplace=True)
    df.wait_time = df.wait_time.apply(timedelta_to_decimal)
    return df

def build_travel_waits_df(df, patterns, direction):
    info_columns = ["start_date", "pid", "tatripid", "rtdir", "day_of_week", "holiday", "decimal_time", "wait_time"]
    
    stop_list = patterns[patterns.rtdir == direction].stpnm.dropna().unique()
    directional_df = df.loc[df.rtdir == direction]
    directional_df[stop_list] = directional_df[stop_list].apply(pd.to_datetime)
    
    travels_waits = []
    for origin in stop_list:
        destinations = get_destination_stops(patterns, origin, direction)
        
        if not destinations:
            continue

        orgin_and_dests = [origin] + destinations
        sorted_df = directional_df.sort_values(by=origin) # sorting by arrival times in origin column
        sorted_df["decimal_time"] = sorted_df[origin].map(lambda x: round(x.hour + x.minute / 60.0, 2))

        sorted_df = calculate_travel_times(sorted_df, origin, destinations)        
        sorted_df = calculate_wait_times(sorted_df, origin)

        melted_df = pd.melt(sorted_df, id_vars=info_columns, value_vars=destinations, var_name="destination", value_name="travel_time")
        melted_df.dropna(subset=["wait_time", "travel_time"], inplace=True)
        melted_df["origin"] = origin
        travels_waits.append(melted_df)
    return pd.concat(travels_waits, ignore_index=True)

def write_travel_waits():
    #travel_waits_path = "../../data/processed/trips_and_waits/" + str(rt) + "/"
    #check_if_path_exists(travel_waits_path)
    #file_name = travel_waits_path + origin.replace("/", "").replace(".", "") + "_" + direction + ".csv"
    #melted_df.to_csv(file_name, columns=header, header=False, index=False, mode='ab+')
    pass