Skip to content

Commit

Permalink
Detect if child process has crashed (Issue#27)
Browse files Browse the repository at this point in the history
Unfortunately it's not easy to figure out what it was working on; the
STARTING message doesn't seem to get sent
  • Loading branch information
Lisa Zorn committed Apr 11, 2016
1 parent 7225b51 commit 52f1471
Showing 1 changed file with 77 additions and 42 deletions.
119 changes: 77 additions & 42 deletions fasttrips/Assignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ def generate_paths(FT, output_dir, iteration):
"""
FastTripsLogger.info("**************************** GENERATING PATHS ****************************")
start_time = datetime.datetime.now()
process_list = []
process_dict = {} # workernum -> {"process":process, "alive":alive bool, "done":done bool, "working_on":(person_id, trip_list_num)}
todo_queue = None
done_queue = None

Expand Down Expand Up @@ -397,12 +397,16 @@ def generate_paths(FT, output_dir, iteration):
done_queue = multiprocessing.Queue()
for process_idx in range(1, 1+num_processes):
FastTripsLogger.info("Starting worker process %2d" % process_idx)
process_list.append(multiprocessing.Process(target=find_trip_based_paths_process_worker,
args=(iteration, process_idx, FT.input_network_dir, FT.input_demand_dir,
FT.output_dir, todo_queue, done_queue,
Assignment.ASSIGNMENT_TYPE==Assignment.ASSIGNMENT_TYPE_STO_ASGN,
Assignment.bump_wait_df)))
process_list[-1].start()
process_dict[process_idx] = {
"process":multiprocessing.Process(target=find_trip_based_paths_process_worker,
args=(iteration, process_idx, FT.input_network_dir, FT.input_demand_dir,
FT.output_dir, todo_queue, done_queue,
Assignment.ASSIGNMENT_TYPE==Assignment.ASSIGNMENT_TYPE_STO_ASGN,
Assignment.bump_wait_df)),
"alive":True,
"done":False
}
process_dict[process_idx]["process"].start()
else:
Assignment.initialize_fasttrips_extension(0, output_dir, FT)

Expand Down Expand Up @@ -460,41 +464,70 @@ def generate_paths(FT, output_dir, iteration):
# multiprocessing follow-up
if num_processes > 1:
# we're done, let each process know
for process_idx in range(len(process_list)):
for process_idx in process_dict.keys():
todo_queue.put('DONE')

# get results
done_procs = 0
while done_procs < len(process_list):

result = done_queue.get()
if result == 'DONE':
FastTripsLogger.debug("Received done")
done_procs += 1

else:
trip_list_id = result[0]
path = FT.passengers.get_path(trip_list_id)
path.cost = result[1]
path.states = result[2]
perf_dict = result[3]

FT.performance.add_info(iteration, trip_list_id, perf_dict)

if path.path_found():
num_paths_found_now += 1

if num_paths_found_now % info_freq == 0:
time_elapsed = datetime.datetime.now() - start_time
FastTripsLogger.info(" %6d / %6d passenger paths found. Time elapsed: %2dh:%2dm:%2ds" % (
num_paths_found_now, est_paths_to_find,
int( time_elapsed.total_seconds() / 3600),
int( (time_elapsed.total_seconds() % 3600) / 60),
time_elapsed.total_seconds() % 60))
done_procs = 0 # where done means not alive
while done_procs < len(process_dict):

try:
result = done_queue.get(True, 30)
worker_num = result[0]

# FastTripsLogger.debug("Received %s" % str(result))
if result[1] == "DONE":
FastTripsLogger.debug("Received done from process %d" % worker_num)
process_dict[worker_num]["done"] = True
elif result[1] == "STARTING":
process_dict[worker_num]["working_on"] = (result[2],result[3])
elif result[1] == "COMPLETED":
trip_list_id = result[2]
path = FT.passengers.get_path(trip_list_id)
path.cost = result[3]
path.states = result[4]
perf_dict = result[5]

FT.performance.add_info(iteration, trip_list_id, perf_dict)

if path.path_found():
num_paths_found_now += 1

if num_paths_found_now % info_freq == 0:
time_elapsed = datetime.datetime.now() - start_time
FastTripsLogger.info(" %6d / %6d passenger paths found. Time elapsed: %2dh:%2dm:%2ds" % (
num_paths_found_now, est_paths_to_find,
int( time_elapsed.total_seconds() / 3600),
int( (time_elapsed.total_seconds() % 3600) / 60),
time_elapsed.total_seconds() % 60))

del process_dict[worker_num]["working_on"]
else:
print "Unexpected done queue contents: " + str(result)

except :
# FastTripsLogger.debug("Error: %s" % str(sys.exc_info()))
pass

# check if any processes are not alive
for process_idx in process_dict.keys():
if process_dict[process_idx]["alive"] and not process_dict[process_idx]["process"].is_alive():
FastTripsLogger.debug("Process %d is not alive" % process_idx)
process_dict[process_idx]["alive"] = False
done_procs += 1

# join up my processes
for proc in process_list:
proc.join()
for process_idx in process_dict.keys():
process_dict[process_idx]["process"].join()

# check if any processes crashed
for process_idx in process_dict.keys():
if not process_dict[process_idx]["done"]:
if "working_on" in process_dict[process_idx]:
FastTripsLogger.info("Process %d appears to have crashed; it was working on %s" % \
(process_idx, str(process_dict[process_idx]["working_on"])))
else:
FastTripsLogger.info("Process %d appears to have crashed; see ft_debug_worker%02d.log" % (process_idx, process_idx))

except (KeyboardInterrupt, SystemExit):
exc_type, exc_value, exc_tb = sys.exc_info()
Expand All @@ -503,7 +536,7 @@ def generate_paths(FT, output_dir, iteration):
for e in error_lines: FastTripsLogger.error(e)
FastTripsLogger.error("Terminating processes")
# terminating my processes
for proc in process_list:
for proc in process_dict:
proc.terminate()
sys.exit(2)
except:
Expand Down Expand Up @@ -1291,14 +1324,16 @@ def find_trip_based_paths_process_worker(iteration, worker_num, input_network_di
# go through my queue -- check if we're done
todo = todo_path_queue.get()
if todo == 'DONE':
done_queue.put('DONE')
done_queue.put( (worker_num, 'DONE') )
FastTripsLogger.debug("Received DONE from the todo_path_queue")
return

# do the work
path = todo

FastTripsLogger.info("Processing person %20s path %d" % (path.person_id, path.trip_list_id_num))
# communicate it to the parent
done_queue.put( (worker_num, "STARTING", path.person_id, path.trip_list_id_num ))

trace_person = False
if path.person_id in Assignment.TRACE_PERSON_IDS:
Expand All @@ -1307,9 +1342,9 @@ def find_trip_based_paths_process_worker(iteration, worker_num, input_network_di

try:
(cost, return_states, perf_dict) = Assignment.find_trip_based_path(iteration, worker_FT, path, hyperpath, trace=trace_person)
done_queue.put( (path.trip_list_id_num, cost, return_states, perf_dict) )
done_queue.put( (worker_num, "COMPLETED", path.trip_list_id_num, cost, return_states, perf_dict) )
except:
FastTripsLogger.exception('Exception')
FastTripsLogger.exception("Exception")
# call it a day
done_queue.put('DONE')
done_queue.put( (worker_num, "EXCEPTION", str(sys.exc_info()) ) )
return

0 comments on commit 52f1471

Please sign in to comment.