Skip to content
Browse files

stereo_mpi: Add timeout for correlation option

  • Loading branch information...
1 parent c5f72d9 commit 50fd54998da4af46c958153702efa354f5068a4f @oleg-alexandrov oleg-alexandrov committed Mar 31, 2013
Showing with 49 additions and 14 deletions.
  1. +49 −14 src/asp/Tools/stereo_mpi.in
View
63 src/asp/Tools/stereo_mpi.in
@@ -19,10 +19,10 @@
import sys, optparse, subprocess, re, os, math, time
import os.path as P
-job_pool = []
+job_pool = [] # currently running jobs
-# Utilities to ensure that the Python parser does not garble negative integers
-# such as '-365' into '-3'.
+# Utilities to ensure that the Python parser does not garble negative
+# integers such as '-365' into '-3'.
escapeStr='esc_rand_str'
def escape_vals(vals):
for index, val in enumerate(vals):
@@ -37,6 +37,32 @@ def unescape_vals(vals):
vals[index] = p.group(1)
return vals
+# Launch a job using Popen. If that job is stereo_corr, enforce
+# timeout.
+# To do: Write a custom wait() function. For now, timeout
+# will be enforced only if the poll() function is called
+# periodically.
+class popen_timeout(subprocess.Popen):
+
+ prog_name = ""
+ start_time = 0
+
+ def __init__(self, *args, **kwargs):
+ self.cmd = args[0]
+ self.prog_name = args[0][0]
+ self.start_time = time.time()
+ super(popen_timeout, self).__init__(*args, **kwargs)
+
+ def poll(self):
+ if re.match('^.*?stereo_corr', self.prog_name):
+ diff = time.time() - self.start_time
+ if ( opt.corr_timeout is not None ) and ( diff > opt.corr_timeout ):
+ # Enforce timeout on stereo_corr
+ print('Job timeout (' + str(opt.corr_timeout) + 'sec): ' + " ".join(self.cmd))
+ super(popen_timeout, self).kill()
+
+ return super(popen_timeout, self).poll()
+
# Custom option parser that will ignore unknown options
class PassThroughOptionParser(optparse.OptionParser):
def _process_args( self, largs, rargs, values ):
@@ -96,25 +122,30 @@ def produce_tiles( settings, tile_w, tile_h ):
return tiles;
-def add_job( cmd ):
+def add_job( cmd ):
# This code could be smarter. It is possible that the processes
# might finish out of order.
sleep_time = 0.001
- while ( len(job_pool) >= opt.processes):
+ while ( len(job_pool) >= opt.processes ):
for i in range(len(job_pool)):
if ( job_pool[i].poll() is not None ):
- job_pool.pop(i);
- job_pool.append( subprocess.Popen(cmd) )
+ job_pool.pop(i)
+ job_pool.append( popen_timeout(cmd) )
return
time.sleep( sleep_time )
- sleep_time = (sleep_time * 5) % 60
- job_pool.append( subprocess.Popen(cmd) )
+ sleep_time = (sleep_time * 5) % 60
+ job_pool.append( popen_timeout(cmd) )
def wait_on_all_jobs():
print "Waiting for jobs to finish";
+ # Must keep on polling the jobs to enforce the timeout
+ sleep_time = 1
while len(job_pool) > 0:
- job_pool[0].wait();
- job_pool.pop(0);
+ for i in range(len(job_pool)):
+ if ( job_pool[i].poll() is not None ):
+ job_pool.pop(i)
+ break # must restart as array changed size
+ time.sleep( sleep_time )
def wipe_existing_threads_arg(call):
# Before inserting a '--threads val' option
@@ -284,6 +315,8 @@ if __name__ == '__main__':
help='Pixel width size for a single subprocess', type='int')
p.add_option('--job-size-h', dest='job_size_h', default=2048,
help='Pixel height size for a single subprocess', type='int')
+ p.add_option('--correlation-timeout', dest='corr_timeout', default=None, type='int',
+ help='Timeout in seconds for a stereo correlation process.')
p.add_option('--dry-run', dest='dryrun', default=False, action='store_true',
help=optparse.SUPPRESS_HELP)
p.add_option('--debug', dest='debug', default=False, action='store_true',
@@ -322,8 +355,9 @@ if __name__ == '__main__':
if opt.mpi_nodes is not None and opt.mpi_nodes > 1 and mpiexec_rank is None:
# Multi-machine execution. This means that the current running
# process has become the management process that spawns other
- # copies of itself on other systems. This thread will only do
- # work when we hit a non-multiprocess step like PPRC or FLTR.
+ # copies of itself on other machines. This block will only do
+ # actual work when we hit a non-multiprocess step like PPRC or
+ # FLTR.
create_subproject_dirs( settings )
@@ -384,7 +418,8 @@ if __name__ == '__main__':
else:
- # Single machine execution
+ # Launch multiple processes doing actual work on the current
+ # machine.
try:
if ( opt.entry_point <= 0 ):

0 comments on commit 50fd549

Please sign in to comment.
Something went wrong with that request. Please try again.