# Extract MTA Data from archives

This notebook helps automate extracting data from the protobufs manually downloaded from [Historical GTFS data](http://web.mta.info/developers/data/archives.html)the latest source suggested at:
https://groups.google.com/d/msg/mtadeveloperresources/Whm5XTVINcE/z-LO12ANAAAJ

Note that another S3 hosted [historical datasource](http://web.mta.info/developers/MTA-Subway-Time-historical-data.html) referenced on the MTA website, but this is outdated, and the above MTA Alert Archive is correct.

NOTE: This notebook assumes that the protobufs have already been downloaded to <code>data/raw/status</code> e.g. <code>data/raw/status/201901.zip</code> from http://web.mta.info/developers/data/archives.html

In [1]:
import os
import pandas as pd
import sys
data_dir = '../data/raw/status'

In [2]:
import glob
protobuf_paths = glob.glob('{}/[0-9]*.zip'.format(data_dir))

if len(protobuf_paths) == 0:
    raise ValueError('No matching protbufs found in {}, please download from https://m.mymtaalerts.com/archive')
    
print(protobuf_paths)

['../data/raw/status/201808.zip', '../data/raw/status/201809.zip', '../data/raw/status/201810.zip', '../data/raw/status/201811.zip', '../data/raw/status/201812.zip', '../data/raw/status/201901.zip', '../data/raw/status/201902.zip', '../data/raw/status/201903.zip', '../data/raw/status/201905.zip', '../data/raw/status/201904.zip', '../data/raw/status/201906.zip']


### Helper cell for recursively unzipping monthly rollups
This is a bit finnicky, as the layout and zip format vary from month-to month, but this is a helpful tool for unzipping some of the.  This will fail for a handful of the monthly archives, and you will either need to modify it, or manually handle those cases.  Especially watch out for <code>201812.zip</code>, as that contains <code>201812.7z</code>

Additionally, there are a small number of corrupted daily zips, so this absorbs and logs those errors.

In [3]:
import zipfile
import shutil
import progressbar
import io

# Keep a list of files with failed extractions
failed_files = os.path.join(data_dir, 'failures.txt')

force = False

# unzip monthly rollups, then unzip the daily files inside
# This code is largely copied from: https://stackoverflow.com/questions/36285502/how-to-extract-zip-file-recursively-in-python
# The daily zipfiles are ~1GB, so there are big speed gains from unzipping in memory
#for monthly_file in protobuf_paths[-1:]:
for monthly_file in ['../data/raw/status/201906.zip',]:
    widgets = [progressbar.Percentage(), progressbar.Bar(), progressbar.Variable('failures')]    

    
    print("Extracting: " + monthly_file)
    z = zipfile.ZipFile(monthly_file)
    for i,f in enumerate(z.namelist()):
        print("{}/{}".format(i+1, len(z.namelist())))
        # get directory name from file
        dirname = os.path.join(data_dir, os.path.splitext(f)[0])
        # create new directory
        os.makedirs(dirname, exist_ok=True)
        # read inner zip file into bytes buffer 
        content = io.BytesIO(z.read(f))
        zip_file = zipfile.ZipFile(content)
        
        # Skip if already unzipped
        if not force:
            if len(glob.glob(dirname+'/*')) == len(zip_file.namelist()):
                print("Skipping " + os.path.basename(dirname))
                continue
         
        # Iterate through in-memory zipfile, dumping sub-minutely protobufs into daily directories
        bar = progressbar.ProgressBar(widgets=widgets, max_value=len(zip_file.namelist()), min_poll_interval=.5).start()
        failures = 0
        for j,f2 in enumerate(zip_file.namelist()):
            try:
                zip_file.extract(f2, dirname)
            except Exception as e:
                # At the moment, some messages a sporadically unable to parse
                with io.open(failed_files, 'a') as fh:
                    fh.write(f2+'\n')
                failures += 1
                
            sys.stdout.flush()
            bar.update(j+1, failures=failures)
        zip_file.close()
        
        bar.finish()
    
    

Extracting: ../data/raw/status/201906.zip
1/30


  5%|##                                                       |failures: ------

KeyboardInterrupt: 

# Extract GTFS into H5
Extract relevant info from GTFS files to H5.  Right now this is just extracting the current train in a station at a given time, and information about which train is expected to arrive next, and when.  It is possible that more information will be added, but this is too heavy-weight to include in [DatExploration.ipynb](DataExploration.ipyny), and should only be run as part of data processing

In [4]:
from functools import lru_cache

In [27]:
# relative MTADelayPredict Project
import sys
import os
import pandas as pd

sys.path.append(os.path.dirname(os.path.dirname(os.path.realpath(os.path.join('DataExploration.ipynb')))))
from MTADelayPredict.utils import gtfs_loader
from importlib import reload
reload(gtfs_loader)

<module 'MTADelayPredict.utils.gtfs_loader' from '/opt/project/MTADelayPredict/utils/gtfs_loader.py'>

In [28]:
# Create loaders for loading data in monthly chunks
loader_start = pd.Timestamp('2018-08-01 00:00:00')
loader_end =  pd.Timestamp('2019-07-01 00:00:00')
drange = pd.date_range(start=loader_start, end=loader_end, freq='M')
loaders = []

# Build separate loaders per month
for i,m in enumerate(drange):
    start_date = m.replace(day=1)
    end_date = m.replace(hour=23,minute=59,second=59)
    loader = gtfs_loader.GTFSLoader(data_dir=os.path.join('../data/raw/status'), \
                                train_line='nqrw')
    loaders.append((loader, start_date, end_date))
    

In [None]:
# Load months in a pool and cache to h5
from multiprocessing import Pool
#STOP_FILTER = '^R16N$'
STOP_FILTER = '^.*N$'
ROUTE_FILTER = 'N'

def f(args):
    loader, start_date, end_date = args
    loader.load_range(start_date, end_date, stop_filter=STOP_FILTER, route_filter=ROUTE_FILTER, verbose=True)
    filename = os.path.join(data_dir, 'status_{:04d}{:02d}.h5'.format(start_date.year, start_date.month))
    loader.stopped_at_df.to_hdf(filename, key='stopped_at')
    loader.next_train_df.to_hdf(filename, key='next_train')
    loader.next_scheduled_arrival_df.to_hdf(filename, key='next_scheduled_arrival')
    
    return "{} - {} Done".format(start_date, end_date)

with Pool(6) as p:
     print(p.map(f, loaders))
    

  2%|                                     |entries:   5096decode_errors:     35