Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize find paths #3

Merged
merged 2 commits into from
Jul 27, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 122 additions & 34 deletions fasttrips/Assignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
limitations under the License.
"""
import Queue
import collections,datetime,math,os,random,sys
import collections,datetime,math,multiprocessing,os,random,sys
import numpy,pandas

from .Logger import FastTripsLogger
from .Logger import FastTripsLogger, setupLogging
from .Passenger import Passenger
from .Path import Path
from .Stop import Stop
Expand Down Expand Up @@ -220,7 +220,21 @@ def assign_passengers(FT, iteration):
"""
FastTripsLogger.info("**************************** GENERATING PATHS ****************************")
start_time = datetime.datetime.now()
num_paths_assigned = 0
process_list = []
todo_queue = multiprocessing.Queue()
done_queue = multiprocessing.Queue()
process_idx = 0

# Setup multiprocessing processes
for process_idx in range(1, 1+multiprocessing.cpu_count()):
FastTripsLogger.info("Starting worker process %2d" % process_idx)
process_list.append(multiprocessing.Process(target=find_trip_based_paths_process_worker,
args=(iteration, process_idx, FT.input_dir, FT.output_dir,
todo_queue, done_queue,
Assignment.ASSIGNMENT_TYPE==Assignment.ASSIGNMENT_TYPE_STO_ASGN)))
process_list[-1].start()

# send tasks to workers for processing
for passenger in FT.passengers:
passenger_id = passenger.passenger_id

Expand All @@ -230,17 +244,37 @@ def assign_passengers(FT, iteration):
num_paths_assigned += 1
continue

trace_passenger = False
if passenger_id in Assignment.TRACE_PASSENGER_IDS:
FastTripsLogger.debug("Tracing assignment of passenger %s" % str(passenger_id))
trace_passenger = True
todo_queue.put( (passenger_id, passenger.path) )
# we're done, let each process know
for process_idx in range(len(process_list)):
todo_queue.put('DONE')

# get results
done_procs = 0
num_paths_assigned = 0
while done_procs < len(process_list):

asgn_iters = Assignment.find_trip_based_path(FT, passenger.path,
hyperpath=Assignment.ASSIGNMENT_TYPE==Assignment.ASSIGNMENT_TYPE_STO_ASGN,
trace=trace_passenger)
result = done_queue.get()
if result == 'DONE':
FastTripsLogger.info("Received done")
done_procs += 1

if passenger.path.path_found():
num_paths_assigned += 1
else:
passenger_id = result[0]
path_id = result[1]
asgn_iters = result[2]
return_states = result[3]

# find passenger with path_id and set return
# todo - inefficient. fix.
for passenger in FT.passengers:
if passenger.path.path_id == path_id:
passenger.path.states = return_states

if passenger.path.path_found():
num_paths_assigned += 1

break

if True or num_paths_assigned % 1000 == 0:
time_elapsed = datetime.datetime.now() - start_time
Expand All @@ -251,6 +285,11 @@ def assign_passengers(FT, iteration):
time_elapsed.total_seconds() % 60))
elif num_paths_assigned % 50 == 0:
FastTripsLogger.debug("%6d / %6d passenger paths assigned" % (num_paths_assigned, len(FT.passengers)))

# join up my processes
for proc in process_list:
proc.join()

return num_paths_assigned

@staticmethod
Expand Down Expand Up @@ -733,25 +772,25 @@ def find_trip_based_path(FT, path, hyperpath, trace):
Path.state_str(end_taz_id, taz_state[-1])))

# Put results into path
path.reset_states()
return_states = collections.OrderedDict()

# Nothing found
if len(taz_state) == 0: return label_iterations
if len(taz_state) == 0: return (label_iterations, return_states)

if hyperpath:
# Choose path and save those results
path_found = False
attempts = 0
while not path_found and attempts < Assignment.MAX_HYPERPATH_ASSIGN_ATTEMPTS:
path_found = Assignment.choose_path_from_hyperpath_states(FT, path, trace, taz_state, stop_states)
(path_found, return_states) = Assignment.choose_path_from_hyperpath_states(FT, path, trace, taz_state, stop_states)
attempts += 1

if not path_found:
path.reset_states()
return_states = collections.OrderedDict()
continue
else:
path.states[end_taz_id] = taz_state[0]
stop_state = taz_state[0]
return_states[end_taz_id] = taz_state[0]
stop_state = taz_state[0]
if path.outbound(): # outbound: egress to access and back
final_state_type = Path.STATE_MODE_EGRESS
else: # inbound: access to egress and back
Expand All @@ -760,20 +799,21 @@ def find_trip_based_path(FT, path, hyperpath, trace):

stop_id = stop_state[Path.STATE_IDX_SUCCPRED]
stop_state = stop_states[stop_id][0]
path.states[stop_id] = stop_state
return_states[stop_id] = stop_state

if trace: FastTripsLogger.debug("Final path:\n%s" % str(path))
return label_iterations
if trace: FastTripsLogger.debug("Final path:\n%s" % str(Path.states_to_str(return_states, path.direction)))
return (label_iterations, return_states)

@staticmethod
def choose_path_from_hyperpath_states(FT, path, trace, taz_state, stop_states):
"""
Choose a path from the hyperpath states.
Returns True if path is set, False if we failed.
Returns (path found bool, ordered dict of states)
"""
taz_label = taz_state[-1][Path.STATE_IDX_LABEL]
cost_cutoff = 1 # taz_label - math.log(0.001)
access_cum_prob = [] # (cum_prob, state)
return_states = collections.OrderedDict()
# Setup access probabilities
for state in taz_state:
prob = int(1000.0*math.exp(-1.0*Assignment.DISPERSION_PARAMETER*state[Path.STATE_IDX_COST])/ \
Expand All @@ -796,15 +836,15 @@ def choose_path_from_hyperpath_states(FT, path, trace, taz_state, stop_states):
start_state_id = path.destination_taz_id
dir_factor = -1

path.states[start_state_id] = Assignment.choose_state(access_cum_prob, trace)
if trace: FastTripsLogger.debug(" -> Chose %s" % Path.state_str(start_state_id, path.states[start_state_id]))
return_states[start_state_id] = Assignment.choose_state(access_cum_prob, trace)
if trace: FastTripsLogger.debug(" -> Chose %s" % Path.state_str(start_state_id, return_states[start_state_id]))

current_stop = path.states[start_state_id][Path.STATE_IDX_SUCCPRED]
current_stop = returnstates[start_state_id][Path.STATE_IDX_SUCCPRED]
# outbound: arrival time
# inbound: departure time
arrdep_time = path.states[start_state_id][Path.STATE_IDX_DEPARR] + \
(path.states[start_state_id][Path.STATE_IDX_LINKTIME]*dir_factor)
last_trip = path.states[start_state_id][Path.STATE_IDX_DEPARRMODE]
arrdep_time = return_states[start_state_id][Path.STATE_IDX_DEPARR] + \
(return_states[start_state_id][Path.STATE_IDX_LINKTIME]*dir_factor)
last_trip = return_states[start_state_id][Path.STATE_IDX_DEPARRMODE]
while True:
# setup probabilities
if trace: FastTripsLogger.debug("current_stop=%8s; %s_time=%s; last_trip=%s" % \
Expand Down Expand Up @@ -834,7 +874,7 @@ def choose_path_from_hyperpath_states(FT, path, trace, taz_state, stop_states):
# Nope, dead end
if len(stop_cum_prob) == 0:
# Try assignment again...
return False
return (False, return_states)

# denom found - cum prob time
for idx in range(len(stop_cum_prob)):
Expand All @@ -852,13 +892,13 @@ def choose_path_from_hyperpath_states(FT, path, trace, taz_state, stop_states):
if trace: FastTripsLogger.debug(" -> Chose %s" % Path.state_str(current_stop, next_state))

# revise first link possibly -- let's not waste time
if path.outbound() and len(path.states) == 1:
if path.outbound() and len(return_states) == 1:
dep_time = datetime.datetime.combine(Assignment.TODAY,
FT.trips[next_state[Path.STATE_IDX_DEPARRMODE]].get_scheduled_departure(current_stop))
# effective trip start time
path.states[path.origin_taz_id][Path.STATE_IDX_DEPARR] = dep_time - path.states[path.origin_taz_id][Path.STATE_IDX_LINKTIME]
return_states[path.origin_taz_id][Path.STATE_IDX_DEPARR] = dep_time - return_states[path.origin_taz_id][Path.STATE_IDX_LINKTIME]

path.states[current_stop] = next_state
return_states[current_stop] = next_state
current_stop = next_state[Path.STATE_IDX_SUCCPRED]
last_trip = next_state[Path.STATE_IDX_DEPARRMODE]
if next_state[Path.STATE_IDX_DEPARRMODE] == Path.STATE_MODE_TRANSFER:
Expand All @@ -871,7 +911,7 @@ def choose_path_from_hyperpath_states(FT, path, trace, taz_state, stop_states):
if ( path.outbound() and next_state[Path.STATE_IDX_DEPARRMODE] == Path.STATE_MODE_EGRESS) or \
(not path.outbound() and next_state[Path.STATE_IDX_DEPARRMODE] == Path.STATE_MODE_ACCESS):
break
return True
return (True, return_states)

@staticmethod
def print_passenger_paths(passengers_df, output_dir):
Expand Down Expand Up @@ -1532,4 +1572,52 @@ def print_load_profile(veh_trips_df, output_dir):
sep="\t",
float_format="%.2f",
index=False)
load_file.close()
load_file.close()

def find_trip_based_paths_process_worker(iteration, worker_num, input_dir, output_dir, todo_path_queue, done_queue, hyperpath):
"""
Process worker function. Processes all the paths in queue.

todo_queue has (passenger_id, path object)
"""
worker_str = "_worker%02d" % worker_num

# Setup a new FT instance for this worker.
# This is just for reading input files into the FT structures,
# but it won't change the FT structures themselves (so it's a read-only instance).
#
# You'd think we could have just passed the FT structure to this method but that would involve pickling/unpickling the
# data and ends up meaning it takes a *really long time* to start the new process ~ 2 minutes per process.
# Simply reading the input files again is faster. No need to read the demand tho.
from .FastTrips import FastTrips
worker_FT = FastTrips(input_dir=input_dir, output_dir=output_dir, read_demand=False,
log_to_console=False, logname_append=worker_str, appendLog=True if iteration > 1 else False)

FastTripsLogger.info("Worker %2d starting" % worker_num)

while True:
# go through my queue -- check if we're done
todo = todo_path_queue.get()
if todo == 'DONE':
done_queue.put('DONE')
return

# do the work
passenger_id = todo[0]
path = todo[1]

FastTripsLogger.info("Processing passenger %s path %s" % (str(passenger_id), str(path.path_id)))

trace_passenger = False
if passenger_id in Assignment.TRACE_PASSENGER_IDS:
FastTripsLogger.debug("Tracing assignment of passenger %s" % str(passenger_id))
trace_passenger = True

try:
(asgn_iters, return_states) = Assignment.find_trip_based_path(worker_FT, path, hyperpath, trace=trace_passenger)
done_queue.put( (passenger_id, path.path_id, asgn_iters, return_states) )
except:
FastTripsLogger.exception('Exception')
# call it a day
done_queue.put('DONE')
return
54 changes: 46 additions & 8 deletions fasttrips/FastTrips.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
See the License for the specific language governing permissions and
limitations under the License.
"""

import os
from operator import attrgetter

from .Assignment import Assignment
from .Logger import FastTripsLogger
from .Logger import FastTripsLogger, setupLogging
from .Passenger import Passenger
from .Route import Route
from .Stop import Stop
Expand All @@ -28,9 +28,33 @@ class FastTrips:
This is the model itself. Should be simple and run pieces and store the big data structures.
"""

def __init__(self, input_dir):
#: Info log filename. Writes brief information about program progression here.
INFO_LOG = "ft_info%s.log"

#: Debug log filename. Detailed output goes here, including trace information.
DEBUG_LOG = "ft_debug%s.log"

def __init__(self, input_dir, output_dir, read_demand=True, log_to_console=True, logname_append="", appendLog=False):
"""
Constructor. Reads input files from *input_dir*.
Constructor.

Reads input files from *input_dir*.
Writes output files to *output_dir*, including log files.

:param input_dir: Location of csv files to read
:type input_dir: string
:param output_dir: Location to write output and log files.
:type output_dir: string
:param read_demand: Read passenger demand? For parallelization, workers don't need to
read demand since the main process will tell them what to do.
:type read_demand: bool
:param log_to_console: Log info to console as well as info log?
:type log_to_console: bool
:param logname_append: Modifier for info and debug log filenames. So workers can write their own logs.
:type logname_append: string
:param appendLog: Append to info and debug logs? When FastTrips assignment iterations (to
handle capacity bumps), we'd like to append rather than overwrite.
:type appendLog: bool
"""
#: :py:class:`list` of :py:class:`fasttrips.Passenger` instances
self.passengers = None
Expand All @@ -47,9 +71,20 @@ def __init__(self, input_dir):
#: :py:class:`dict` with :py:attr:`fasttrips.Trip.trip_id` key and :py:class:`fasttrips.Trip` value
self.trips = None

self.read_input_files(input_dir)
#: string representing directory with input data
self.input_dir = input_dir

#: string representing directory in which to write our output
self.output_dir = output_dir

# setup logging
setupLogging(os.path.join(self.output_dir, FastTrips.INFO_LOG % logname_append),
os.path.join(self.output_dir, FastTrips.DEBUG_LOG % logname_append),
logToConsole=log_to_console, append=appendLog)

self.read_input_files(input_dir, read_demand)

def read_input_files(self, input_dir):
def read_input_files(self, input_dir, read_demand):
"""
Reads in the input files files from *input_dir* and initializes the relevant data structures.
"""
Expand Down Expand Up @@ -78,8 +113,11 @@ def read_input_files(self, input_dir):
# read the access links into both the TAZs and the stops involved
TAZ.read_access_links(input_dir, self.tazs, self.stops)

# Read the demand int passenger_id -> passenger instance
self.passengers = Passenger.read_demand(input_dir)
if read_demand:
# Read the demand int passenger_id -> passenger instance
self.passengers = Passenger.read_demand(input_dir)
else:
self.passengers = None

def run_assignment(self, output_dir):
# Do it!
Expand Down
Loading