# rollup_generator_dev

This is the dev notebook for the rollup generator script.

First thing first, let's see how the MTA archives are set up.

In [1]:
import zipfile

In [2]:
z = zipfile.ZipFile('/Users/alex/Downloads/201906.zip')

In [3]:
z.namelist()

['20190601.zip',
 '20190602.zip',
 '20190603.zip',
 '20190604.zip',
 '20190605.zip',
 '20190606.zip',
 '20190607.zip',
 '20190608.zip',
 '20190609.zip',
 '20190610.zip',
 '20190611.zip',
 '20190612.zip',
 '20190613.zip',
 '20190614.zip',
 '20190615.zip',
 '20190616.zip',
 '20190617.zip',
 '20190618.zip',
 '20190619.zip',
 '20190620.zip',
 '20190621.zip',
 '20190622.zip',
 '20190623.zip',
 '20190624.zip',
 '20190625.zip',
 '20190626.zip',
 '20190627.zip',
 '20190628.zip',
 '20190629.zip',
 '20190630.zip']

In [3]:
zz = zipfile.ZipFile(z.open('20190601.zip'))

In [14]:
import numpy as np
import pandas as pd

pd.Series(zz.namelist()).sample(10)

7353        gtfs_j_20190601_092434.gtfs
21569     gtfs_ace_20190601_210034.gtfs
724         gtfs_g_20190601_044646.gtfs
22305       gtfs_L_20190601_213845.gtfs
23883       gtfs_L_20190601_230215.gtfs
19315     gtfs_ace_20190601_190258.gtfs
13961       gtfs_g_20190601_143345.gtfs
2957     gtfs_bdfm_20190601_061631.gtfs
11341     gtfs_ace_20190601_123119.gtfs
281         gtfs_g_20190601_042920.gtfs
dtype: object

So the MTA GTFS-RT archive is per-month, and is in the form of a zip of zips, where the second layer of zips is snapshots per feed per day per minute. Weirdly the feed IDs that you read from the API are numeric, whilst the feed IDs in the archive are in terms of the train lines they cover.

This structure means that it is most efficient to parse a month's worth of data at time. The compressed file is 34 GB; the uncompressed file is much larger, but there's no reason to uncompress everything, as it's much more efficient to uncompress slices of the data, and work in batches.

Although the archival data is provided in monthly form, it makes the most sense to provide the output data in daily form. This is a much more workable slice of data if you want to e.g. experiment, and it's easy to download a bunch of different objects with a for loop.

Unforunately the only way to detect a trip that started in the 0:00-00:01 interval is to read in the 23:59-00:00 interval from the previous day, which requires downloading the entire previous month's worth of data for a single record.

The same problem occurs on the opposite, tail end. Since we are using ragged-right time intervals ("every trip that started on day X"), trips will carry over into the following day. In the case of the last day of the month, this will require also reading in the first few hours worth of data from the folllowing day, which will require downloading *that* whole month's worth of data.

So overall, to generate a month's worth of rollups for, say, June, we would also need to download all of May's data and all of July's data. This does unfortunately mean that we have a lag period of one month, plus however long it takes the MTA to upload the archives.

All of this will raise the time required to do a parse run, but probably not by that much, seeing as how the total time cost will be compute-dominated anyway.

Let's next consider some edge cases. The time appears to be in hourly EST timestamps, but let's make absolutely sure they aren't using UTC or something.

In [4]:
# the following code taken from the gtfs-tripify codebase

import warnings
import numpy as np
import pandas as pd
from google.transit import gtfs_realtime_pb2

def parse_feed(bytes):
    """
    Helper function for reading a feed in using Protobuf. 
    Handles bad feeds by replacing them with None.
    """
    # TODO: tests.
    with warnings.catch_warnings():
        warnings.simplefilter("error")

        try:
            fm = gtfs_realtime_pb2.FeedMessage()
            fm.ParseFromString(bytes)
            return fm

        # Protobuf occasionally raises an unexpected tag RuntimeWarning. This occurs when a
        # feed that we read has unexpected problems, but is still valid overall. This 
        # warning corresponds with data loss in most cases. `gtfs-tripify` is sensitive to the
        # disappearance of trips in the record. If data is lost, it's best to excise the 
        # message entirely. Hence we catch these warnings and return a flag value None, to be
        # taken into account upstream. For further information see the following thread:
        # https://groups.google.com/forum/#!msg/mtadeveloperresources/9Fb4SLkxBmE/BlmaHWbfw6kJ
        except RuntimeWarning:
            warnings.warn(
                f"The Protobuf parser raised a RunTimeWarning while parsing an update, indicating "
                f"possible corruption and/or loss of data. This update cannot be safely used "
                f"upstream and has been dropped."
            )
            return None

        # Raise for all other errors.
        except:
            raise

In [25]:
zz.open(zz.namelist()[0])

<zipfile.ZipExtFile name='gtfs_L_20190601_035944.gtfs' mode='r'>

In [23]:
parse_feed(zz.open(zz.namelist()[0], 'r').read()).header.timestamp

1559375984

This sample archival file purports to be for the timestamp `20190601_035944`. That translates to June 1st, 2019 at 03:59:44 local time (e.g. 3:59 AM, the 44th second thereof). The Unix timestamp in the GTFS-RT file maps to `06/01/2019 @ 7:59am` exactly, per [this website](https://www.unixtimestamp.com/index.php). Interestingly enough, it appears that the least significant digit in the time included in the file name is the second, whilst the least significant digit in the time included in the GTFS-RT message itself is the minute, e.g. the second has been stripped off. Anyway, the time does match up to EST.

Storing data in timezone-aware format like this is risky. Phenomena like daylight savings time and leap seconds can make this time ambiguous. Unix time, on the other hand, is unambiguous, so the time information on the file itself is what we will trust when we are attempting to order the buffers.

I think we can trust the coverage of the files (e.g. they really do cover "the whole month"). Note that the exact amount of time covered in a month may differ due to time differences. It is also possible that some files will be missing outright, we need to account for this possibility in our code.

In [37]:
from datetime import datetime, timedelta
from dateutil import tz
YEARMONTH = '201904'

datetime(int(YEARMONTH[:4]), int(YEARMONTH[-2:].lstrip('0')), 1, 
         tzinfo=tz.gettz('America/New_York')) - timedelta(minutes=1)

datetime.datetime(2019, 3, 31, 23, 59, tzinfo=tzfile('/usr/share/zoneinfo/America/New_York'))

Once we have the list of files, 
We need to parse the names of the files in order to:
* Sort them into feed categories.
* Determine their order.

In [5]:
nameparts = []
for n in zz.namelist():
    assert 'gtfs_' in n and '.gtfs' in n
    n = n.replace('gtfs_', '')[:-5]
    lines_covered = n[:n.find('_')].upper()
    nameparts.append(lines_covered)

In [6]:
pd.Series(nameparts).value_counts()

7       4542
G       4293
J       4258
NQRW    3431
ACE     3301
BDFM    3102
L       2399
dtype: int64

This is odd. Many of the train lines are missing. Elsewhere, there are differences between the lines purported to by covered by the feed in the feed name, and the lines actually covered in the direct-access API. The feedmap for the API below for comparison:

In [7]:
FEEDMAP = {
    1:  ['1', '2', '3', '4', '5', '6', 'GS'],
    2:  ['L'],
    11: ['SI'],
    16: ['N', 'Q', 'R', 'W'],
    21: ['B', 'D', 'F', 'M'],
    26: ['A', 'C', 'E', 'H', 'FS'],
    31: ['G'],
    36: ['J', 'Z'],
    51: ['7']
}

In [30]:
%%time 

from tqdm import tqdm

def is_vehicle_update(message):
    return str(message.trip_update.trip.route_id) == '' and str(message.alert) == ''

def is_alert(message):
    return str(message.alert) != ''

def is_trip_update(message):
    return not is_vehicle_update(message) and not is_alert(message)

trip_id_unassigned_count = 0
parse_error_count = 0
msg_lines = []

for n in zz.namelist():
    msg_line = []
    try:
        buffer = parse_feed(zz.open(n, 'r').read())
    except:  # parsing failed, bad message, skip it
        parse_error_count += 1
        continue
    for entity in buffer.entity:
        if is_trip_update(entity):
            if entity.trip_update.trip.route_id == "":
                trip_id_unassigned_count += 1
            msg_line.append(entity.trip_update.trip.route_id)
    msg_lines.append(msg_line)

CPU times: user 1min 28s, sys: 1.55 s, total: 1min 30s
Wall time: 1min 37s


In [31]:
trip_id_unassigned_count

0

In [32]:
parse_error_count

2

In [33]:
train_lines_represented = []
for msg_line in msg_lines:
    train_lines_represented.append(''.join(np.sort(pd.Series(msg_line).unique())))

In [34]:
pd.Series(train_lines_represented).value_counts()

7         4339
G         4292
J         4258
NQR       3361
DFM       3089
L         2399
ACEH      2039
ACEFSH    1154
77X        203
AEFSH      107
DNQR        70
DFMR        13
dtype: int64

Wow, not only are the 1, 2, 3, 4, 5, 6 trains still missing, but the W is as well, and the R is somehow showing up in the `DFM` feed instead of the `NQR` feed, where it belongs, on certain occassions.

It appears that the feeds that are not included are straight-up missing from the rollup. This must be a data error on the MTA's part.

In [39]:
len(zz.namelist()), len(msg_lines)

(25326, 25324)

Let's try a different day from a different month (the different month is not necessary, but I had another month's of data available so why not). Say, Febuary 2019.

In [40]:
%%time

zz = zipfile.ZipFile(zipfile.ZipFile('/Users/alex/Downloads/201902.zip').open('20190201.zip'))

from tqdm import tqdm

def is_vehicle_update(message):
    return str(message.trip_update.trip.route_id) == '' and str(message.alert) == ''

def is_alert(message):
    return str(message.alert) != ''

def is_trip_update(message):
    return not is_vehicle_update(message) and not is_alert(message)

trip_id_unassigned_count = 0
parse_error_count = 0
msg_lines = []

for n in zz.namelist():
    msg_line = []
    try:
        buffer = parse_feed(zz.open(n, 'r').read())
    except:  # parsing failed, bad message, skip it
        parse_error_count += 1
        continue
    for entity in buffer.entity:
        if is_trip_update(entity):
            if entity.trip_update.trip.route_id == "":
                trip_id_unassigned_count += 1
            msg_line.append(entity.trip_update.trip.route_id)
    msg_lines.append(msg_line)

CPU times: user 1min 25s, sys: 3.18 s, total: 1min 28s
Wall time: 1min 34s


In [41]:
trip_id_unassigned_count

0

In [42]:
parse_error_count

166

In [43]:
train_lines_represented = []
for msg_line in msg_lines:
    train_lines_represented.append(''.join(np.sort(pd.Series(msg_line).unique())))
    
pd.Series(train_lines_represented).value_counts()

G         4027
J         2988
77X       2707
ACEFSH    2676
NQRW      2653
BDFM      2421
7         1527
JZ         955
           675
NQR        315
DFM        248
AEFSH      115
ACEH        76
A            9
ACE          7
AC           6
NW           4
NQW          4
Q            3
BDF          2
ACFSH        1
BD           1
DF           1
NRW          1
ACEFS        1
dtype: int64

The W reports in this one, but the 1,2,3,4,5,6 are still missing.

Let's look at a couple more days.

In [47]:
def report(zz):
    parse_error_count = 0
    trip_id_unassigned_count = 0
    
    for n in zz.namelist():
        msg_line = []
        try:
            buffer = parse_feed(zz.open(n, 'r').read())
        except:  # parsing failed, bad message, skip it
            parse_error_count += 1
            continue
        for entity in buffer.entity:
            if is_trip_update(entity):
                if entity.trip_update.trip.route_id == "":
                    trip_id_unassigned_count += 1
                msg_line.append(entity.trip_update.trip.route_id)
        msg_lines.append(msg_line)
        
    train_lines_represented = []
    for msg_line in msg_lines:
        train_lines_represented.append(''.join(np.sort(pd.Series(msg_line).unique())))
    return trip_id_unassigned_count, parse_error_count, pd.Series(train_lines_represented).value_counts()

In [48]:
trip_id_unassigned_count, parse_error_count, train_lines_represented = report(
    zipfile.ZipFile(z.open('20190615.zip'))
)

In [49]:
train_lines_represented

G         8355
J         7241
ACEFSH    5812
7         5743
NQR       3724
DFM       3310
77X       2967
NQRW      2653
L         2440
BDFM      2421
JZ         955
           689
AEFSH      277
ACEH        88
A            9
ACE          7
AC           6
NQW          4
NW           4
Q            3
BDF          2
ACFSH        1
BD           1
ACEFS        1
DF           1
NRW          1
dtype: int64

I now suspect that the {1, 2, 3, 4, 5, 6} dropped out the archival service at some point, and no one noticed, because it is inexplicibly absent from three different random days now.

We are going to proceed with the development process with this likelihood in mind (because we sure as hell are not going to develop our own independent archival service). When we provide the output data to users, we will do so with the note that certain trains may be absent from certain feeds. We will build an incident report for this archival service manually, by writing a reporting dashboard that takes the logified outputs as input and returns line plots of the rate counts per hour as output. This will allow us to flag when feeds drop from the archive. This will be a "next step" after we generate the rollups.

We return to analyzing name-parts. We now know that trains may rarely end up shuffled into the wrong feed. We will ignore this problem, because this is convoluted logic I do not want in my fetch script. Instead we'll just sort by name.

In [60]:
from collections import defaultdict

# form a map of the form:
# {'<$LINE_IDENTIFIER>_<$YEAR><$MONTH><$DAY>': [...list of indices in zz.namelist()]}
# e.g.:
# {'g_20190201': [0, 6, 12, ...],
#  'j_20190201': [1, 7, 13, ...],
#  ...
# }
name_map = defaultdict(list)

for idx, n in enumerate(zz.namelist()):
    assert 'gtfs_' in n and '.gtfs' in n
    n = n.replace('gtfs_', '')[:-5][:-7]
    name_map[n].append(idx)

In [None]:
from datetime import datetime, timedelta
from dateutil import tz
import os
import json
import itertools
import boto3
from botocore.client import Config
import gtfs_tripify as gt
from zipfile import ZipFile
import requests
from collections import defaultdict
import numpy as np


AWS_BUCKET_NAME = 'gtfsarchive'
S3_TIMEOUT = 60
YEARMONTH = '201904'  # user-specified at runtime
END_OF_MONTH_OVERFLOW_BUFFER_ARITY = 5 * (120)  # 5 hours, assuming 2 updates/minute


new_york_tz = tz.gettz('America/New_York')
current_month_first_minute = datetime(
    int(YEARMONTH[:4]), 
    int(YEARMONTH[-2:].lstrip('0')),
    1, 
    tzinfo=tz.gettz('America/New_York')
)
current_month, current_month_year = (
    current_month_first_minute.month,
    current_month_first_minute.year
)
last_minute_day_before_current_month = current_month_first_minute - timedelta(minutes=1)
last_month, last_month_year = (
    last_minute_day_before_current_month.month,
    last_minute_day_before_current_month.year
)
next_month_overflow_time = current_month_first_minute + timedelta(
    months=1
)
next_month, next_month_year = (
    next_month_overflow_time.month,
    next_month_overflow_time.year
)
CURRENT_MONTH_DOWNLOAD_URL =\
    f'https://s3.amazonaws.com/{AWS_BUCKET_NAME}/Data/{str(current_month)}{str(current_month_year)}.zip'
LAST_MONTH_DOWNLOAD_URL =\
    f'https://s3.amazonaws.com/{AWS_BUCKET_NAME}/Data/{str(last_month)}{str(last_month_year)}.zip'
NEXT_MONTH_DOWNLOAD_URL =\
    f'https://s3.amazonaws.com/{AWS_BUCKET_NAME}/Data/{str(current_month)}{str(current_month_year)}.zip'


def download_file(url, local_filename):
    """
    Streaming download to filename, taken from https://stackoverflow.com/a/16696317/1993206.
    """
    local_filename = url.split('/')[-1]
    # NOTE the stream=True parameter below
    with requests.get(url, stream=True) as r:
        r.raise_for_status()
        with open(local_filename, 'wb') as f:
            for chunk in r.iter_content(chunk_size=8192): 
                if chunk: # filter out keep-alive new chunks
                    f.write(chunk)
                    # f.flush()
    return local_filename

def generate_name_map(namelist):
    """
    Form a map of the form:
    
    {'<$LINE_IDENTIFIER>_<$YEAR><$MONTH><$DAY>': [...list of indices in namelist()], ...}

    E.g.:

    {'g_20190201': [0, 6, 12, ...], 'j_20190201': [1, 7, 13, ...], ...}
    
    This map is used to batch input to gtfs_tripify.
    """
    name_map = defaultdict(list)
    for idx, n in enumerate(namelist()):
        assert 'gtfs_' in n and '.gtfs' in n
        n = n.replace('gtfs_', '')[:-5][:-7]
        name_map[n].append(idx)
    return name_map

# only need the last available file (per batch) of the last month file
download_file(LAST_MONTH_DOWNLOAD_URL, 'last_month.zip')
z = ZipFile('last_month.zip')
namelist = np.array(z.namelist())
namemap = generate_name_map(namelist)
last_month_final_updates_namemap = {key: namemap[key][-1] for key in namename}
for key in last_month_final_updates_namemap:
    z.extract(last_month_final_updates_namemap[key], 'TODO: SOME ID')
os.remove('last_month.zip')

# only need the first END_OF_MONTH_OVERFLOW_BUFFER_HOURS of the next month file
download_file(NEXT_MONTH_DOWNLOAD_URL, 'next_month.zip')
z = ZipFile('next_month.zip')
namelist = np.array(z.namelist())
next_month_first_updates_namemap = {
    key: namemap[key][:END_OF_MONTH_OVERFLOW_BUFFER_ARITY] for key in namename
}
os.remove('next_month.zip')

# need to entire current month file
download_file(CURRENT_MONTH_DOWNLOAD_URL, 'this_month.zip')


