Trip log joins are an operational necessity. However, they do not process fast enough. Let's get them running more quickly.

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from google.transit import gtfs_realtime_pb2
import sys; sys.path.append("../src/")
from processing import parse_feeds_into_trip_logbook, merge_trip_logbooks

with open("../src/tests/data/gtfs_realtime_pull_1.dat", "rb") as f:
    gtfs_r0 = gtfs_realtime_pb2.FeedMessage()
    gtfs_r0.ParseFromString(f.read())
with open("../src/tests/data/gtfs_realtime_pull_2.dat", "rb") as f:
    gtfs_r1 = gtfs_realtime_pb2.FeedMessage()
    gtfs_r1.ParseFromString(f.read())

left_logbook = parse_feeds_into_trip_logbook([gtfs_r0], [0])
right_logbook = parse_feeds_into_trip_logbook([gtfs_r1], [1])

In [3]:
# Slow!
# result = merge_trip_logbooks([left_logbook, right_logbook])

The above makes it obvious that the problem is that we are running `_join_trip_logs`, a multi-second operation, 162 times here. So we need to raise that functions' execution speed.

In [4]:
from processing import _join_trip_logs

Old code:

This is a 20x speedup. However, the full merge still takes 20 seconds (!), due to `__setitem__` calls still in `_join_trip_logs`. It's not transparent to me where these are coming from, so I spent some time muddling about in it.

We got another 33% speedup by avoiding `DataFrame` transforms.

Eventually we get a really huge speedup, from forever to less than a second, but this is still surprising to me as I expected it to process faster. But this seems to be as fast as it gets...

In [6]:
from pyinstrument import Profiler

profiler = Profiler()
profiler.start()

# code you want to profile
result = merge_trip_logbooks([left_logbook, right_logbook])

profiler.stop()

print(profiler.output_text(unicode=True, color=True))

[31m0.862[0m _join_trip_logs  [2mprocessing.py:592[0m
├─ [33m0.336[0m __setitem__  [2mpandas/core/indexing.py:135[0m
│  ├─ [33m0.186[0m _setitem_with_indexer  [2mpandas/core/indexing.py:233[0m
│  │  ├─ [32m0.115[0m setter  [2mpandas/core/indexing.py:455[0m
│  │  │  ├─ [32m0.059[0m __setitem__  [2mpandas/core/frame.py:2405[0m
│  │  │  │  └─ [32m0.049[0m _set_item  [2mpandas/core/frame.py:2473[0m
│  │  │  │     ├─ [92m[2m0.030[0m _set_item  [2mpandas/core/generic.py:1499[0m
│  │  │  │     │  └─ [92m[2m0.028[0m set  [2mpandas/core/internals.py:3636[0m
│  │  │  │     └─ [92m[2m0.018[0m _sanitize_column  [2mpandas/core/frame.py:2593[0m
│  │  │  ├─ [92m[2m0.026[0m setitem  [2mpandas/core/internals.py:3167[0m
│  │  │  │  └─ [92m[2m0.026[0m apply  [2mpandas/core/internals.py:2978[0m
│  │  │  │     └─ [92m[2m0.022[0m setitem  [2mpandas/core/internals.py:654[0m
│  │  │  └─ [92m[2m0.018[0m copy  [2mpandas/core/generic.py:3057[0m
│  │  │  

In [7]:
import os

logs = [f for f in os.listdir("./data/subway_time_20160512") if f != 'arch.tar.xz' 
        and 'si' not in f and 'l' not in f]

In [8]:
logs[:5]

['gtfs-20160512T0415Z',
 'gtfs-20160512T1759Z',
 'gtfs-20160512T2155Z',
 'gtfs-20160512T0610Z',
 'gtfs-20160513T0153Z']

In [9]:
from google.transit import gtfs_realtime_pb2

In [10]:
def parse_feed(filepath):
    with open(filepath, "rb") as f:
        try:
            fm = gtfs_realtime_pb2.FeedMessage()
            fm.ParseFromString(f.read())
            return fm
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            return None

Note: the data above comes from the next notebook. You can get it yourself by using the following magical incantation:

    pip install requests gtfs-realtime-bindings
    python -c "import requests; r = requests.get('http://data.mytransit.nyc.s3.amazonaws.com/subway_time/2016/2016-05/subway_time_20160512.tar.xz'); open('arch.tar.xz', 'wb').write(r.content)"
    tar xvfJ arch.tar.xz
    python -c "from google.transit import gtfs_realtime_pb2; test_example = gtfs_realtime_pb2.FeedMessage().ParseFromString(open('gtfs-20160512T0400Z', 'rb').read()); print(type(test_example))"

In [11]:
from tqdm import tqdm

In [12]:
feeds = [parse_feed("./data/subway_time_20160512/" + l) for l in tqdm(logs[:6])]

100%|██████████| 6/6 [00:01<00:00,  4.33it/s]


In [13]:
import sys; sys.path.append("../src/")
from processing import parse_feeds_into_trip_logbook

In [16]:
logbooks = [
    parse_feeds_into_trip_logbook(feeds[0:3], [0, 1, 2]), 
    parse_feeds_into_trip_logbook(feeds[3:6], [3, 4, 5])
]

This is now very fast.

In [19]:
%timeit master = merge_trip_logbooks(logbooks)

1 loop, best of 3: 305 ms per loop


Now we need to vet that the result of our join operation is what it should be.

Here's one suspicious result.

In [36]:
list(master.keys())[14]

'083300_5..S03R'

In [37]:
llog = parse_feeds_into_trip_logbook(feeds[0:3], [0, 1, 2])
rlog = parse_feeds_into_trip_logbook(feeds[3:6], [3, 4, 5])

In [58]:
suspect = [entity for entity in feeds[5].entity if entity.trip_update.trip.trip_id == "083300_5..S03R"][0]

In [59]:
suspect

id: "000222"
trip_update {
  trip {
    trip_id: "083300_5..S03R"
    start_date: "20160512"
    route_id: "5"
  }
  stop_time_update {
    arrival {
      time: 1463080413
    }
    departure {
      time: 1463080593
    }
    stop_id: "244S"
  }
  stop_time_update {
    arrival {
      time: 1463080683
    }
    departure {
      time: 1463080683
    }
    stop_id: "245S"
  }
  stop_time_update {
    arrival {
      time: 1463080773
    }
    departure {
      time: 1463080773
    }
    stop_id: "246S"
  }
  stop_time_update {
    arrival {
      time: 1463080923
    }
    stop_id: "247S"
  }
}

In [38]:
llog['083300_5..S03R']

Unnamed: 0,trip_id,route_id,action,minimum_time,maximum_time,stop_id,latest_information_time
0,083300_5..S03R,5,STOPPED_OR_SKIPPED,1,2.0,501S,1
1,083300_5..S03R,5,STOPPED_OR_SKIPPED,1,2.0,502S,1
2,083300_5..S03R,5,STOPPED_OR_SKIPPED,1,2.0,503S,1
3,083300_5..S03R,5,STOPPED_OR_SKIPPED,1,2.0,504S,1
4,083300_5..S03R,5,STOPPED_OR_SKIPPED,1,2.0,505S,1
5,083300_5..S03R,5,STOPPED_OR_SKIPPED,1,2.0,213S,1
6,083300_5..S03R,5,STOPPED_OR_SKIPPED,1,2.0,214S,1
7,083300_5..S03R,5,STOPPED_OR_SKIPPED,1,2.0,215S,1
8,083300_5..S03R,5,STOPPED_OR_SKIPPED,1,2.0,216S,1
9,083300_5..S03R,5,STOPPED_OR_SKIPPED,1,2.0,217S,1


In [39]:
rlog['083300_5..S03R']

Unnamed: 0,trip_id,route_id,action,minimum_time,maximum_time,stop_id,latest_information_time
0,083300_5..S03R,5,STOPPED_AT,,5.0,244S,5
1,083300_5..S03R,5,EN_ROUTE_TO,5.0,,245S,5
2,083300_5..S03R,5,EN_ROUTE_TO,5.0,,246S,5
3,083300_5..S03R,5,EN_ROUTE_TO,5.0,,247S,5


In [35]:
master[list(master.keys())[14]]

Unnamed: 0,trip_id,route_id,action,minimum_time,maximum_time,stop_id,latest_information_time
0,083300_5..S03R,5,STOPPED_OR_SKIPPED,1,2.0,501S,1
1,083300_5..S03R,5,STOPPED_OR_SKIPPED,1,2.0,502S,1
2,083300_5..S03R,5,STOPPED_OR_SKIPPED,1,2.0,503S,1
3,083300_5..S03R,5,STOPPED_OR_SKIPPED,1,2.0,504S,1
4,083300_5..S03R,5,STOPPED_OR_SKIPPED,1,2.0,505S,1
5,083300_5..S03R,5,STOPPED_OR_SKIPPED,1,2.0,213S,1
6,083300_5..S03R,5,STOPPED_OR_SKIPPED,1,2.0,214S,1
7,083300_5..S03R,5,STOPPED_OR_SKIPPED,1,2.0,215S,1
8,083300_5..S03R,5,STOPPED_OR_SKIPPED,1,2.0,216S,1
9,083300_5..S03R,5,STOPPED_OR_SKIPPED,1,2.0,217S,1


After examination it turns out that this happened because this train appeared in the data in feed position 1, then again in feed position 5, but nowhere else in our six feeds. The resulting log is actually correct, then, except for `maximum_time` being set to 2, which is not. This happened because I didn't account for this possibility in designing the algorithm for writing that field; this is the first time that I am seeing this happen!

There is a deeper problem with this, however, which is that I detect finalization based on the dissappearance of a trip from the record. If a trip can just appear and dissappear in the record like this, then...that doesn't work.

For now let's keep this uh-oh in mind. I will have to devise a way of fixing it later.