In [14]:
import csv, datetime, json, os, psycopg2, requests, time

def format_intpt(lat, lon):
    return f'{str(float(lat))},{str(float(lon))}' 

if not os.path.exists("out"):
    os.makedirs("out")

In [15]:
class OTPClient:
    """A simple OpenTripPlanner client"""
    def __init__(self,port='9090'):
        self.url = 'http://localhost:%s/otp/routers/default' % port
        
    def get_plan(self, fromPlace, toPlace, **kwargs):
        t = time.localtime()
        time_ = time.strftime('%I:%M%p', t)
        if 'time' in kwargs:
            time_ = kwargs['time']
        date_ = time.strftime('%m-%d-%Y', t)
        if 'date' in kwargs:
            date_ = kwargs['date']
        mode = 'TRANSIT,WALK'
        if 'mode' in kwargs:
            mode = kwargs['mode']
        maxWalkDistance = 4828.032 # 3 miles in meters
        if 'maxWalkDistance' in kwargs:
            maxWalkDistance = kwargs['maxWalkDistance']
        wheelchair = 'false'
        if 'wheelchair' in kwargs:
            wheelchair = kwargs['wheelchair']
        debugItinerary = 'false'
        if 'debugItinerary' in kwargs:
            debugItinerary = kwargs['debugItinerary']
        arriveBy = 'false'
        if 'arriveBy' in kwargs:
            arriveBy = kwargs['arriveBy']
        locale = 'en'
        params = {
            'fromPlace': fromPlace,
            'toPlace': toPlace,
            'time': time_,
            'date': date_,
            'mode': mode,
            'maxWalkDistance': maxWalkDistance,
            'arriveBy': arriveBy,
            'wheelchair': wheelchair,
            'debugItinerary': debugItinerary,
            'locale': locale
            
        }
        r = requests.get(self.url + '/plan', params=params)
        return r

    def get_geocode(self, query):
        params = {
            'query': query,
        }
        r = requests.get(self.url + '/geocode', params=params)
        return r


In [3]:
from datetime import datetime, timedelta

def datetime_range(start, end, delta):
    current = start
    while current < end:
        yield current
        current += delta

transit_time_date = \
    [(dt.strftime('%I:%M%p'),dt.strftime('%m-%d-%Y')) for dt in 
       datetime_range(datetime(2021, 5, 3, 7), datetime(2021, 5, 3, 9), 
       timedelta(minutes=10))]

print(transit_time_date)

[('07:00AM', '05-03-2021'), ('07:10AM', '05-03-2021'), ('07:20AM', '05-03-2021'), ('07:30AM', '05-03-2021'), ('07:40AM', '05-03-2021'), ('07:50AM', '05-03-2021'), ('08:00AM', '05-03-2021'), ('08:10AM', '05-03-2021'), ('08:20AM', '05-03-2021'), ('08:30AM', '05-03-2021'), ('08:40AM', '05-03-2021'), ('08:50AM', '05-03-2021')]


In [4]:
with open('../data/pgh_census_blocks.geojson') as f:
    pgh = json.load(f)

In [16]:
def process_routes(org, dst, result):
    if 'plan' not in result:
        new_result = {
            'org': org,
            'dst': dst,
            'fare': None,
            'start_date': None,
            'initial_wait': None,
            'duration': None,
            'total_duration': None
            }        
        return new_result
        
    results = []
    init_date = result['plan']['date']
    for i in result['plan']['itineraries']:
        if not i['walkLimitExceeded']:
            initial_wait = (i['startTime'] - init_date)/1000
            duration = i['duration']
            total_duration = duration + initial_wait
            fare = 0
            if 'fare' in i and 'fare' in i['fare'] and 'regular' in i['fare']['fare']:
                fare = i['fare']['fare']['regular']['cents']
            new_result = {
                'org': org,
                'dst': dst,
                'fare': fare,
                'start_date': int(init_date/1000),
                'initial_wait': int(initial_wait),
                'duration': int(duration),
                'total_duration': int(total_duration)
            }
            results.append(new_result)
    if len(results) > 0:
        return sorted(results, key = lambda r: r['total_duration'])[0]            
    else:
        new_result = {
            'org': org,
            'dst': dst,
            'fare': None,
            'start_date': None,
            'initial_wait': None,
            'duration': None,
            'total_duration': None
            }
        
        return new_result
        
def get_transit_route(i,j,time,date):    
    a = pgh['features'][i]
    b = pgh['features'][j]
    org = format_intpt(a['properties']['INTPTLAT10'],a['properties']['INTPTLON10'])
    dst = format_intpt(b['properties']['INTPTLAT10'],b['properties']['INTPTLON10'])
    org_idx = a['properties']['GEOID10']
    dst_idx = b['properties']['GEOID10']
    otp = OTPClient()
    r = otp.get_plan(org,dst,time=time,date=date)
    route = process_routes(org_idx, dst_idx, r.json())
    insert_route(route)

def insert_route(route):    
    record_to_insert = (route['org'], route['dst'], route['fare'], route['start_date'], route['initial_wait'], route['duration'], route['total_duration'])

    sql = """ INSERT INTO transit_routes(org, dst, fare, start_date, initial_wait, duration, total_duration)
              VALUES(%s, %s, %s, %s, %s, %s, %s);"""
    conn = None
    try:
        conn = psycopg2.connect(dbname="opentripplanner")
        cur = conn.cursor()
        cur.execute(sql, record_to_insert)
        conn.commit()
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()

def get_empty_routes():
    rows = []
    """ query data from the vendors table """
    conn = None
    try:
        conn = psycopg2.connect(dbname="opentripplanner")
        cur = conn.cursor()
        cur.execute("SELECT org, dst FROM transit_routes WHERE fare is null;")
        print("The number of null rows: ", cur.rowcount)
        row = cur.fetchone()
        while row is not None:
            rows.append(row)
            print(row)
            row = cur.fetchone()

        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()
    return rows            

In [6]:
import concurrent, concurrent.futures, datetime, math, shutil, subprocess, sys, time, traceback

class SimpleThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
    def __init__(self, max_workers):
        super(SimpleThreadPoolExecutor, self).__init__(max_workers=max_workers)
        self.futures = []

    def submit(self, fn, *args, **kwargs):
        future = super(SimpleThreadPoolExecutor, self).submit(fn, *args, **kwargs)
        self.futures.append(future)
        return future

    def get_futures(self):
        return self.futures

    def shutdown(self):
        exception_count = 0
        results = []
        for completed in concurrent.futures.as_completed(self.futures):
            try:
                results.append(completed.result())
            except Exception as e:
                exception_count += 1
                sys.stderr.write(
                    'Exception caught in SimpleThreadPoolExecutor.shutdown.  Continuing until all are finished.\n' +
                    'Exception follows:\n' +
                    traceback.format_exc())
        super(SimpleThreadPoolExecutor, self).shutdown()
        if exception_count:
            raise Exception('SimpleThreadPoolExecutor failed: %d of %d raised exception' % (exception_count, len(self.futures)))
        print('SimpleThreadPoolExecutor succeeded: all %d jobs completed' % len(self.futures))
        return results


class SimpleProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor):
    def __init__(self, max_workers):
        super(SimpleProcessPoolExecutor, self).__init__(max_workers=max_workers)
        self.futures = []

    def submit(self, fn, *args, **kwargs):
        future = super(SimpleProcessPoolExecutor, self).submit(fn, *args, **kwargs)
        self.futures.append(future)
        return future

    def get_futures(self):
        return self.futures

    def shutdown(self):
        exception_count = 0
        results = []
        for completed in concurrent.futures.as_completed(self.futures):
            try:
                results.append(completed.result())
            except Exception as e:
                exception_count += 1
                sys.stderr.write(
                    'Exception caught in SimpleProcessPoolExecutor.shutdown.  Continuing until all are finished.\n' +
                    'Exception follows:\n' +
                    traceback.format_exc())
        super(SimpleProcessPoolExecutor, self).shutdown()
        if exception_count:
            raise Exception('SimpleProcessPoolExecutor failed: %d of %d raised exception' % (exception_count, len(self.futures)))
        print('SimpleProcessPoolExecutor succeeded: all %d jobs completed' % len(self.futures))
        return results

    def kill(self, signal=9):
        for pid in self._processes.keys():
            print('Killing %d with signal %d' % (pid, signal))
            os.kill(pid, signal)

In [17]:
#otp = OTPClient()
#a = pgh['features'][7]
#b = pgh['features'][1]
#org = format_intpt(a['properties']['INTPTLAT10'],a['properties']['INTPTLON10'])
#dst = format_intpt(b['properties']['INTPTLAT10'],b['properties']['INTPTLON10'])
#r = otp.get_plan(org,dst,time='07:00AM',date='05-03-2021')

In [13]:
time_ = '07:00AM'
date_ = '05-03-2021'
start = time.time()
pool = SimpleThreadPoolExecutor(15)
for i in range(8800,9000):
    for j in range(len(pgh['features'])):
        pool.submit(get_transit_route, i, j, time_, date_)            
pool.shutdown()
None
end = time.time()
print(end - start)

SimpleThreadPoolExecutor succeeded: all 1820800 jobs completed
34542.60036325455
