In [1]:
import csv
import requests
import json
import time as tm
import sqlite3
import traceback
import sys
from math import cos, asin, sqrt
from multiprocessing.dummy import Pool as ThreadPool
import os
import datetime
from IPython.display import clear_output
import pickle
import pandas as pd
import random

from tfl_journeyplanner_api_tools import *

In [2]:
# Read in origin longlats: pop-weighted MSOA centroids
orig_coords = []
with open('PopWeightedCentroids2011LondonMSOA_coords.csv', 'r') as inputfile:
    reader = csv.reader(inputfile, delimiter = ',')
    for row in reader:
        orig_coords.append((row[0], row[4], row[5])) # read in longlats coords from csv file into tuples
orig_coords = orig_coords[1:]

# Read in destination longlats: jobs-weighted MSOA centroids
dest_coords = []
with open('JobsWeightedCentroids2011LondonMSOA_coords.csv', 'r') as inputfile:
    reader = csv.reader(inputfile, delimiter = ',')
    for row in reader:
        dest_coords.append((row[0], row[1], row[2])) # read in longlats coords from csv file into tuples
dest_coords = dest_coords[1:]

odpairs = GenerateODPairs(orig_coords[:], dest_coords[:]) # specify how many OD pairs to generate here
print(len(orig_coords), len(dest_coords)) # checking data import was successful
print(len(odpairs), "pairs generated. First 10:", odpairs[:10])

983 983
966289 pairs generated. First 10: [(('E02006854', '-0.025046237', '51.50164399'), ('E02000001', '-0.091155997', '51.51475914')), (('E02006854', '-0.025046237', '51.50164399'), ('E02000002', '0.136596432', '51.58623085')), (('E02006854', '-0.025046237', '51.50164399'), ('E02000003', '0.138810779', '51.5705831')), (('E02006854', '-0.025046237', '51.50164399'), ('E02000004', '0.17624198', '51.56078086')), (('E02006854', '-0.025046237', '51.50164399'), ('E02000005', '0.144375564', '51.56175313')), (('E02006854', '-0.025046237', '51.50164399'), ('E02000007', '0.154249648', '51.55985824')), (('E02006854', '-0.025046237', '51.50164399'), ('E02000008', '0.138580433', '51.55208708')), (('E02006854', '-0.025046237', '51.50164399'), ('E02000009', '0.118662175', '51.55239182')), (('E02006854', '-0.025046237', '51.50164399'), ('E02000010', '0.150976445', '51.5482683')), (('E02006854', '-0.025046237', '51.50164399'), ('E02000011', '0.162473217', '51.54805375'))]


In [3]:
# set up currentpairs
target_msoas = ['E02000384','E02000455','E02000558','E02000649','E02000664','E02000779','E02000001'] # 6 cluster centroids and City
currentpairs = [odpair for odpair in odpairs if odpair[1][0] in target_msoas]
random.shuffle(currentpairs) # randomise order to randomise API call time per pair
print(len(currentpairs))

6881


In [4]:
# set up batchsettings
batchsettings = {'processingstarttime': datetime.datetime(2019,4,25,6,0), # 0600 or 1100
                 'processingendtime': datetime.datetime(2019,4,25,8,0), # 0800 or 1300
                 'date': '', #'&date=20190322',
                 'time': '', #'&time=0830',
                 'timeIs': '', #'&timeIs=Arriving',
                 'journeyPreference': '&journeyPreference=leastTime',
                 'useMultiModalCall': '&useMultiModalCall=true'}

In [5]:
#set up db
db = '20190425_ampeak/traveloptions_to_cityplus6clusters.db'
if not os.path.exists('20190425_ampeak'):
    os.makedirs('20190425_ampeak')
setup_db(db)

In [6]:
# set up network data
with open('stoplocations.json', 'r') as infile:
    stoplocations = json.load(infile)
stoplist = {}
for stop in stoplocations['features']:
    stoplist[stop['properties']['NAPTAN']] = stop['geometry']['coordinates']
# with open('networkdata', 'rb') as infile:
#     edges, nodes = pickle.load(infile)
edges = {}
nodes = {}
print(len(stoplist), len(edges), len(nodes))

12838 0 0


In [7]:
# Execute data collection from API and write to DB
# Multithreaded version
# Auto-looping to reprocess pairs with errors

while len(currentpairs) > 0:
    # Batching of OD pairs into 290s
    startpos = 0
    nbatches = len(currentpairs) // 290 + 1
    initialbatchsize = len(currentpairs) % 290
    processed = set()
    errors = []

    # timed start
    while datetime.datetime.now() < batchsettings['processingstarttime']:
        print('Time is', datetime.datetime.now(), '. Starting processing at', batchsettings['processingstarttime'])
        tm.sleep(10)
        clear_output()

    print("Starting processing:", len(currentpairs), "pairs in", nbatches, "batches.")

    for batch in range(nbatches):
        outputs = []
        starttime = tm.clock()
        pool = ThreadPool(12)

        # multithreaded query the next batch of 290
        outputs.extend(pool.starmap(call_api, [(pair, stoplist, batchsettings) for pair in currentpairs[startpos : (initialbatchsize + batch * 290)]]))
        pool.close()
        pool.join()

        downloadtime = tm.clock() - starttime

        # single-thread write this batch's outputs to db - to avoid schema locks
        for output in outputs:
            for option in output:
                if option[4]:
                    write_to_db(db, option[0][0][0], option[0][1][0], option[1], option[2])
                    write_to_network(edges, nodes, option[3])
                    processed.add(option[0])
                else:
                    errors.append(option[0])

        processingtime = tm.clock() - starttime

        print('Processed batch', batch, 'in', processingtime, 's (', str(downloadtime/processingtime * 100)[:4], '% download time).', 
              len(processed) + len(errors), 'pairs processed altogether with', len(errors), 'errors so far.')

        # update next batch's startpos to be this batch's endpos
        startpos = initialbatchsize + batch * 290

        # check if processing time has expired
        if datetime.datetime.now() > batchsettings['processingendtime']:
            print('Abort - processing time ended at', datetime.datetime.now())
            break

        # if finished this batch in less than 60s, wait until 60s has passed for this batch
        if processingtime < 60:
            print('Waiting', 60 - processingtime, 's before next batch.')
            tm.sleep(60 - processingtime)
    print('Completed.')
    
    # check if processing time has expired
    if datetime.datetime.now() > batchsettings['processingendtime']:
        print('Abort - processing time ended at', datetime.datetime.now())
        break
        
    # if all pairs processed this round produced errors, then break the processing
    if len(errors) == len(currentpairs):
        print('Abort - all pairs processed this batch produced errors.')
        break
    else:
        currentpairs = [pair for pair in currentpairs if pair not in processed]


Starting processing: 6881 pairs in 24 batches.
Processed batch 0 in 180.14939814417122 s ( 75.1 % download time). 211 pairs processed altogether with 0 errors so far.
Processed batch 1 in 245.60318816620426 s ( 74.1 % download time). 501 pairs processed altogether with 0 errors so far.
Processed batch 2 in 244.79697328441648 s ( 74.3 % download time). 791 pairs processed altogether with 0 errors so far.
Processed batch 3 in 239.6671358102369 s ( 73.7 % download time). 1081 pairs processed altogether with 0 errors so far.
Processed batch 4 in 262.3663285002442 s ( 76.3 % download time). 1371 pairs processed altogether with 0 errors so far.
Processed batch 5 in 241.90922811470864 s ( 74.5 % download time). 1661 pairs processed altogether with 0 errors so far.
Processed batch 6 in 239.6686348320327 s ( 73.2 % download time). 1951 pairs processed altogether with 0 errors so far.
Processed batch 7 in 225.9253714701165 s ( 72.3 % download time). 2241 pairs processed altogether with 0 errors 

In [8]:
currentpairs = [pair for pair in currentpairs if pair not in processed]
len(currentpairs)

0

In [9]:
with open('20190425_ampeak/networkdata', 'wb') as outfile:
    pickle.dump((edges, nodes), outfile)

In [10]:
# getting disruption data
results = requests.get('https://api.tfl.gov.uk/Line/Mode/bus%2Ccoach%2Ctube%2Cdlr%2Ctram%2Ctflrail%2Coverground%2Cnational-rail/Disruption')
disruptions = json.loads(results.content)
with open('20190425_ampeak/disruptions', 'wb') as outfile:
    pickle.dump(disruptions, outfile)

In [11]:
disruptions

[{'$type': 'Tfl.Api.Presentation.Entities.Disruption, Tfl.Api.Presentation.Entities',
  'category': 'Information',
  'type': 'specialService',
  'categoryDescription': 'Information',
  'description': 'http://www.nationalrail.co.uk/',
  'additionalInfo': 'Custom',
  'affectedRoutes': [],
  'affectedStops': [],
  'closureText': 'specialService'},
 {'$type': 'Tfl.Api.Presentation.Entities.Disruption, Tfl.Api.Presentation.Entities',
  'category': 'Information',
  'type': 'specialService',
  'categoryDescription': 'Information',
  'description': 'http://www.nationalrail.co.uk/',
  'additionalInfo': 'Custom',
  'affectedRoutes': [],
  'affectedStops': [],
  'closureText': 'specialService'},
 {'$type': 'Tfl.Api.Presentation.Entities.Disruption, Tfl.Api.Presentation.Entities',
  'category': 'Information',
  'type': 'minorDelays',
  'categoryDescription': 'Information',
  'description': 'http://www.nationalrail.co.uk/',
  'additionalInfo': 'Minor delays on some routes',
  'affectedRoutes': [],
