## GTFS - Transfer Edges

In [247]:
import bisect
import pandas as pd
import datetime
import pickle
import numpy as np
import geopy.distance
from tqdm.auto import tqdm
from tqdm import tqdm_notebook
tqdm.pandas()
import dask.dataframe as dd
from dask.multiprocessing import get
from geopy.point import Point
import datetime as dt
from multiprocessing import Pool, cpu_count
from contextlib import closing
import math

from dask.diagnostics import ProgressBar
ProgressBar().register()

DATA_PATH = '../../../input_data/synthetic_examples/input_data/Test1/'

In [248]:
MAX_TRANSFER_DIST = 200
MAX_WAIT_TIME = dt.timedelta(minutes=10)
DAY = dt.datetime(2016, 1, 4)

In [249]:
OUTPUT_PATH = '../../../output_data/validation/test1/'

## Load Nodes

In [250]:
# Load nodes
FILES_PREFIX = 'morning'
file_name = OUTPUT_PATH + FILES_PREFIX + '_nodes.pkl'
print(f'Loading nodes from file {file_name}')
nodes_df = pd.read_pickle(file_name)
nodes_df.head(3)

Loading nodes from file ../../../output_data/validation/test1/morning_nodes.pkl


Unnamed: 0,index,trip_id,arrival_time,departure_time_stop,stop_id,stop_sequence,route_id,departure_time_trip_departure,stop_code,stop_lat,stop_lon,arrival,departure,node_id
0,25,19285956_151215,07:08:00,07:08:00,25123,1,2544,07:08:00,38160,32.050923,34.849426,2016-01-04 07:08:00,2016-01-04 07:08:00,0
1,26,19285957_151215,07:17:00,07:17:00,25123,1,2544,07:17:00,38160,32.050923,34.849426,2016-01-04 07:17:00,2016-01-04 07:17:00,1
2,27,19285958_151215,07:26:00,07:26:00,25123,1,2544,07:26:00,38160,32.050923,34.849426,2016-01-04 07:26:00,2016-01-04 07:26:00,2


In [251]:
nodes_df.shape

(1139, 14)

In [252]:
nodes_df.nunique()

index                            1139
trip_id                            42
arrival_time                     1080
departure_time_stop              1080
stop_id                            64
stop_sequence                      37
route_id                            2
departure_time_trip_departure      41
stop_code                          64
stop_lat                           64
stop_lon                           64
arrival                          1080
departure                        1080
node_id                          1139
dtype: int64

### Filter nodes for development

In [253]:
# start_time = DAY + dt.timedelta(hours=8)
# end_time = start_time + dt.timedelta(minutes=10)

# nodes_df = nodes_df[nodes_df['arrival'] > start_time][nodes_df['arrival'] < end_time]

In [254]:
# nodes_df.shape

In [255]:
# nodes_df.nunique()

In [256]:
nodes_df[nodes_df['stop_id'] == 12816].head()

Unnamed: 0,index,trip_id,arrival_time,departure_time_stop,stop_id,stop_sequence,route_id,departure_time_trip_departure,stop_code,stop_lat,stop_lon,arrival,departure,node_id


## Parallelizing

In [257]:
def run_parallel_df(df_grouped, func):
    with closing(Pool()) as pool:
        res_list = pool.map(func, [group for name, group in df_grouped])
    return pd.concat(res_list)

In [258]:
def run_parallel_dict(data, num_of_batches, func):
    batch_size = math.ceil(len(data) / num_of_batches)
    with closing(Pool()) as pool:
#         res_list = []
        data_items = list(data.items())
        print(f'Processing data in workers pool. Numner of batches is {num_of_batches} where each batch has approx. {batch_size} keys')
        sliced_data = [data_items[i:i+batch_size] for i in range(num_of_batches)]
        print('Finished slicing the data.')
#         curr_batch = 1
        res_list = pool.map(func, sliced_data)
#         for res in tqdm_notebook(pool.imap(func, sliced_data), total=num_of_batches):
#             res_list.append(res)
#             print(f'Finished processing batch number {curr_batch} of {num_of_batches} total. Number of transfers from this batch: {len(res)}')
#             curr_batch += 1
    return [item for sublist in res_list for item in sublist]

#### Testing parallelization

In [259]:
# d = {'a': [1,2,3], 'b':[1]}
# # num_of_batches = 2
# # d_items = list(d.items())
# # batch_size = math.ceil(len(d) / num_of_batches)
# # sliced_d = [d_items[i:i+batch_size] for i in range(num_of_batches)]
# # print(len(d))

# def print_dict_with_sleep(data):
#     print('processing..')
#     data = dict(data)
#     for k,v in data.items():
#         print(len(v))
# #     time.sleep(3)
# run_parallel_dict(d, 2, print_dict_with_sleep)

## Load Nearby Stops

In [260]:
stops_mapping_df = pd.read_csv(DATA_PATH + 'StopsToStops.csv')

In [261]:
stops_mapping_df.head(3)

Unnamed: 0,in_stop_id,near_stop_id,near_dist
0,12834,12834,0.0
1,12841,12841,0.0
2,12889,12889,0.0


In [262]:
stops_mapping_df.columns 

Index(['in_stop_id', 'near_stop_id', 'near_dist'], dtype='object')

In [263]:
if 'one_line' in OUTPUT_PATH or 'two_lines' in OUTPUT_PATH:
        stops_mapping_df.columns = ['NEAR_DIST', 'IN_FID', 'NEAR_FID']
else:
    stops_mapping_df.columns = ['IN_FID', 'NEAR_FID', 'NEAR_DIST']  # TODO: switch order for other inputs (or better- use a dictionary to map colun names)

In [264]:
stops_mapping_df.columns

Index(['IN_FID', 'NEAR_FID', 'NEAR_DIST'], dtype='object')

In [265]:
# stops_mapping_df.shape

In [266]:
# stops_mapping_df.nunique()

In [267]:
# TODO: 
# change walk speed to 1m/s - IMPL
# maybe switch to 100-150
# 15 minutes walk + wait

# hoping to get to 10-20 edges per node

In [268]:
stops_df = pd.read_csv(DATA_PATH + 'stops.txt')

### Get all stops from which we wish to start computations

In [269]:
nodes_df.head()

Unnamed: 0,index,trip_id,arrival_time,departure_time_stop,stop_id,stop_sequence,route_id,departure_time_trip_departure,stop_code,stop_lat,stop_lon,arrival,departure,node_id
0,25,19285956_151215,07:08:00,07:08:00,25123,1,2544,07:08:00,38160,32.050923,34.849426,2016-01-04 07:08:00,2016-01-04 07:08:00,0
1,26,19285957_151215,07:17:00,07:17:00,25123,1,2544,07:17:00,38160,32.050923,34.849426,2016-01-04 07:17:00,2016-01-04 07:17:00,1
2,27,19285958_151215,07:26:00,07:26:00,25123,1,2544,07:26:00,38160,32.050923,34.849426,2016-01-04 07:26:00,2016-01-04 07:26:00,2
3,28,19285959_151215,07:35:00,07:35:00,25123,1,2544,07:35:00,38160,32.050923,34.849426,2016-01-04 07:35:00,2016-01-04 07:35:00,3
4,29,19285960_151215,07:45:00,07:45:00,25123,1,2544,07:45:00,38160,32.050923,34.849426,2016-01-04 07:45:00,2016-01-04 07:45:00,4


In [270]:
nodes_df[nodes_df['stop_id'] == 12816]

Unnamed: 0,index,trip_id,arrival_time,departure_time_stop,stop_id,stop_sequence,route_id,departure_time_trip_departure,stop_code,stop_lat,stop_lon,arrival,departure,node_id


In [271]:
t1 = DAY+dt.timedelta(hours=7, minutes=15)
t2 = t1 + dt.timedelta(minutes=15)


In [272]:
t1

datetime.datetime(2016, 1, 4, 7, 15)

In [273]:
t2

datetime.datetime(2016, 1, 4, 7, 30)

In [274]:
start_nodes_df = nodes_df[nodes_df['departure'] <= t2][nodes_df['departure'] >= t1]

  """Entry point for launching an IPython kernel.


In [275]:
start_nodes_df[start_nodes_df['stop_id'] == 12816]

Unnamed: 0,index,trip_id,arrival_time,departure_time_stop,stop_id,stop_sequence,route_id,departure_time_trip_departure,stop_code,stop_lat,stop_lon,arrival,departure,node_id


In [276]:
start_nodes_df.shape

(114, 14)

In [277]:
start_nodes_df.head()

Unnamed: 0,index,trip_id,arrival_time,departure_time_stop,stop_id,stop_sequence,route_id,departure_time_trip_departure,stop_code,stop_lat,stop_lon,arrival,departure,node_id
1,26,19285957_151215,07:17:00,07:17:00,25123,1,2544,07:17:00,38160,32.050923,34.849426,2016-01-04 07:17:00,2016-01-04 07:17:00,1
2,27,19285958_151215,07:26:00,07:26:00,25123,1,2544,07:26:00,38160,32.050923,34.849426,2016-01-04 07:26:00,2016-01-04 07:26:00,2
17,98,19285957_151215,07:18:54,07:18:54,15909,2,2544,07:17:00,31902,32.054576,34.847617,2016-01-04 07:18:54,2016-01-04 07:18:54,17
18,99,19285958_151215,07:27:54,07:27:54,15909,2,2544,07:26:00,31902,32.054576,34.847617,2016-01-04 07:27:54,2016-01-04 07:27:54,18
33,170,19285957_151215,07:20:48,07:20:48,15907,3,2544,07:17:00,31900,32.062576,34.84621,2016-01-04 07:20:48,2016-01-04 07:20:48,33


In [278]:
start_nodes_list = start_nodes_df['node_id'].to_list()
start_nodes_list = [str(n) for n in start_nodes_list]

In [279]:
start_nodes_list[:3]

['1', '2', '17']

In [280]:
'335' in start_nodes_list

False

In [281]:
'340' in start_nodes_list

False

In [282]:
with open((OUTPUT_PATH + FILES_PREFIX + '_start_nodes.pkl'), 'wb') as f:
    pickle.dump(start_nodes_list, f)

#### Validation - let's find the most frequent origin stops and make sure it seems right.

In [283]:
# stops_mapping_150m_df.IN_FID.mode()

In [284]:
# stops_df.iloc[10841]

In [285]:
# stops_df.iloc[10841]

We can see this is the central bus station, which makes sense it will have the most possible transfers.

Now we must re-map the nearby stops to stop_id as used in the GTFS feeds.

In [286]:
stops_df['IN_FID'] = stops_df['stop_id']

In [287]:
stops_df[['IN_FID', 'stop_id']].head()

Unnamed: 0,IN_FID,stop_id
0,12834,12834
1,12841,12841
2,12889,12889
3,12893,12893
4,12894,12894


In [288]:
stops_df.shape

(64, 9)

In [289]:
stops_mapping_df.head(2)

Unnamed: 0,IN_FID,NEAR_FID,NEAR_DIST
0,12834,12834,0.0
1,12841,12841,0.0


In [290]:
stops_mapping_df = stops_mapping_df.merge(stops_df[['IN_FID', 'stop_id']], on='IN_FID', how='left')
stops_mapping_df.rename(columns={'stop_id':'from_stop_id'}, inplace=True)
stops_mapping_df = stops_mapping_df.merge(stops_df[['IN_FID', 'stop_id']], left_on='NEAR_FID', right_on='IN_FID', how='left')
stops_mapping_df.rename(columns={'stop_id':'to_stop_id', 'NEAR_DIST':'dist'}, inplace=True)

In [291]:
stops_mapping_df = stops_mapping_df[['from_stop_id', 'to_stop_id', 'dist']]
stops_mapping_df.head()

Unnamed: 0,from_stop_id,to_stop_id,dist
0,12834,12834,0.0
1,12841,12841,0.0
2,12889,12889,0.0
3,12893,12893,0.0
4,12894,12894,0.0


In [292]:
AVG_WALK_SPEED = 0.83  # meters per second (m/s)
stops_mapping_df['walk_time_sec'] = stops_mapping_df['dist'] / AVG_WALK_SPEED

In [293]:
stops_mapping_df.head(3)

Unnamed: 0,from_stop_id,to_stop_id,dist,walk_time_sec
0,12834,12834,0.0,0.0
1,12841,12841,0.0,0.0
2,12889,12889,0.0,0.0


In [294]:
stops_mapping_df.to_csv(OUTPUT_PATH + 'nearby_stops.csv')

In [295]:
stops_mapping_df.to_pickle(OUTPUT_PATH + 'nearby_stops.pkl')

In [296]:
stops_mapping_df.shape

(72, 4)

In [297]:
stops_mapping_df.nunique()

from_stop_id     64
to_stop_id       64
dist              5
walk_time_sec     5
dtype: int64

In [298]:
# stops_mapping_df = pd.read_pickle(OUTPUT_PATH + 'nearby_stops.pkl')

In [299]:
stops_mapping_df = stops_mapping_df[stops_mapping_df['dist'] < MAX_TRANSFER_DIST]

In [300]:
stops_mapping_df.shape

(72, 4)

In [301]:
stops_mapping_df.nunique()

from_stop_id     64
to_stop_id       64
dist              5
walk_time_sec     5
dtype: int64

## Compute Transfer Edges

In [302]:
stops_to_nodes = nodes_df.groupby('stop_id')[[
    'node_id', 'trip_id', 'arrival', 'departure', 'route_id']].apply(lambda g: g.values.tolist()).to_dict()

In [303]:
dict(list(stops_to_nodes.items())[0:2])

{12834: [[573,
   '18673598_161215',
   Timestamp('2016-01-04 08:16:45'),
   Timestamp('2016-01-04 08:16:45'),
   9807],
  [574,
   '18673599_161215',
   Timestamp('2016-01-04 08:25:45'),
   Timestamp('2016-01-04 08:25:45'),
   9807],
  [575,
   '18673600_161215',
   Timestamp('2016-01-04 08:35:45'),
   Timestamp('2016-01-04 08:35:45'),
   9807],
  [576,
   '18673601_161215',
   Timestamp('2016-01-04 08:45:45'),
   Timestamp('2016-01-04 08:45:45'),
   9807],
  [577,
   '18673602_161215',
   Timestamp('2016-01-04 08:56:45'),
   Timestamp('2016-01-04 08:56:45'),
   9807],
  [578,
   '18673603_161215',
   Timestamp('2016-01-04 09:08:45'),
   Timestamp('2016-01-04 09:08:45'),
   9807],
  [579,
   '18673604_161215',
   Timestamp('2016-01-04 09:20:45'),
   Timestamp('2016-01-04 09:20:45'),
   9807],
  [580,
   '18673605_161215',
   Timestamp('2016-01-04 09:32:45'),
   Timestamp('2016-01-04 09:32:45'),
   9807],
  [581,
   '18673606_161215',
   Timestamp('2016-01-04 09:44:45'),
   Timestamp('

In [304]:
dict(list(stops_to_nodes.items()))

{12834: [[573,
   '18673598_161215',
   Timestamp('2016-01-04 08:16:45'),
   Timestamp('2016-01-04 08:16:45'),
   9807],
  [574,
   '18673599_161215',
   Timestamp('2016-01-04 08:25:45'),
   Timestamp('2016-01-04 08:25:45'),
   9807],
  [575,
   '18673600_161215',
   Timestamp('2016-01-04 08:35:45'),
   Timestamp('2016-01-04 08:35:45'),
   9807],
  [576,
   '18673601_161215',
   Timestamp('2016-01-04 08:45:45'),
   Timestamp('2016-01-04 08:45:45'),
   9807],
  [577,
   '18673602_161215',
   Timestamp('2016-01-04 08:56:45'),
   Timestamp('2016-01-04 08:56:45'),
   9807],
  [578,
   '18673603_161215',
   Timestamp('2016-01-04 09:08:45'),
   Timestamp('2016-01-04 09:08:45'),
   9807],
  [579,
   '18673604_161215',
   Timestamp('2016-01-04 09:20:45'),
   Timestamp('2016-01-04 09:20:45'),
   9807],
  [580,
   '18673605_161215',
   Timestamp('2016-01-04 09:32:45'),
   Timestamp('2016-01-04 09:32:45'),
   9807],
  [581,
   '18673606_161215',
   Timestamp('2016-01-04 09:44:45'),
   Timestamp('

In [305]:
DEPARTURE_INDEX = 3
for stop, nodes in stops_to_nodes.items():
    stops_to_nodes[stop] = sorted(nodes, key=lambda x: x[DEPARTURE_INDEX])

In [306]:
total_values = 0
max_nodes_in_stop = 0
for n in stops_to_nodes.values():
    num_nodes = len(n)
    if num_nodes > max_nodes_in_stop:
        max_nodes_in_stop = num_nodes
    total_values += num_nodes
print(f'There is a total of {total_values} nodes in the stops_to_nodes dictionary')

There is a total of 1139 nodes in the stops_to_nodes dictionary


In [307]:
len(stops_to_nodes)

64

In [308]:
print(f'We have an average of {total_values/len(stops_to_nodes)}, and a max of {max_nodes_in_stop} nodes in a single stop.')

We have an average of 17.796875, and a max of 35 nodes in a single stop.


In [309]:
ARRIVAL_INDEX = 2
DEPARTURE_INDEX = 3
ROUTE_ID_INDEX = 4
def get_transfer_edges_from_nodes(stops_to_nodes_batch):
    print('Processing batch...')
    stops_to_nodes_batch = dict(stops_to_nodes_batch)
    transfer_edges = []
    for stop, nodes in tqdm(stops_to_nodes_batch.items()):
#         print(f'Handling stop: {stop}')
        for start_n in nodes:
#             print(f'Start node: {start_n}')
            nearby_stops_df = stops_mapping_df[stops_mapping_df['from_stop_id'] == stop]
            # Add current stop to check transfers from the same stop
            nearby_stops_df.append({'from_stop_id': [stop], 'to_stop_id': [stop], 'dist': [0], 'walk_time_sec': [0]}, ignore_index=True)
            # TODO: verify this we're not staying to the same line in same direction
#             print(f'Going though nearby stops')
            for s in nearby_stops_df.iterrows():
                nearby_stop_id = s[1]['to_stop_id']
#                 print(f'Nearby stop: {nearby_stop_id}')
                if nearby_stop_id not in stops_to_nodes:
                    # Some stops don't have trips that operate all week. Some operate only on weekends.
                    # If this is such a stop we should continue to look at other stops, we won't find any
                    # nodes here.
                    continue
                nearby_nodes = stops_to_nodes[nearby_stop_id]
                second_line_earliest_start_time = start_n[ARRIVAL_INDEX] + dt.timedelta(seconds=s[1]['walk_time_sec'])
                second_line_latest_start_time = second_line_earliest_start_time + MAX_WAIT_TIME
                # Find index of first node that departs at least at second_start_time or later
                _, _, _, departures, _ = zip(*nearby_nodes)
                i = bisect.bisect_left(departures, second_line_earliest_start_time)
#                 print(f'Number of nodes in current stop: {len(nearby_nodes)}')
                while (i < len(nearby_nodes) and 
                       nearby_nodes[i][DEPARTURE_INDEX] >= second_line_earliest_start_time and 
                       nearby_nodes[i][DEPARTURE_INDEX] <= second_line_latest_start_time):
#                     print(f'Handling node with index {i}: {nearby_nodes[i]}')
                    node = nearby_nodes[i][0]
                    if node == start_n[0] or (nearby_nodes[i][ROUTE_ID_INDEX] == start_n[ROUTE_ID_INDEX] and s == stop):
                        # We don't wish to transfer to the same node (no self-edges). 
                        # Another case we wish to avoid is transferring to the same line (route__id) in the same stop.
                        i += 1
                        continue
                    transfer_edges.append((start_n[0], nearby_nodes[i][0], nearby_nodes[i][DEPARTURE_INDEX] - start_n[ARRIVAL_INDEX]))
                    i += 1
    return transfer_edges

In [310]:
# transfer_edges = run_parallel_dict(stops_to_nodes, 1000, get_transfer_edges_from_nodes)

In [311]:
transfer_edges = get_transfer_edges_from_nodes(stops_to_nodes)

Processing batch...


HBox(children=(IntProgress(value=0, max=64), HTML(value='')))




In [312]:
len(transfer_edges)

830

In [313]:
transfer_edges[:3]

[(583, 584, Timedelta('0 days 00:08:00')),
 (584, 585, Timedelta('0 days 00:08:00')),
 (585, 586, Timedelta('0 days 00:08:00'))]

## TODO: we should eliminate transfers of the same trip_id

In [314]:
transfer_edges_sec = []
for s, t, w in transfer_edges:
    transfer_edges_sec.append((s, t, w.total_seconds()))

In [315]:
transfer_edges_sec[0:5]

[(583, 584, 480.0),
 (584, 585, 480.0),
 (585, 586, 480.0),
 (586, 587, 480.0),
 (587, 588, 480.0)]

In [316]:
import pickle
with open(OUTPUT_PATH + FILES_PREFIX + '_transfer_edges.pkl', 'wb') as f:
    print(f'Saving transfers to file {f.name}')
    pickle.dump(transfer_edges_sec, f)

Saving transfers to file ../../../output_data/validation/test1/morning_transfer_edges.pkl
