Skip to content

Commit

Permalink
Merge 420ea9f into 935ca51
Browse files Browse the repository at this point in the history
  • Loading branch information
deanthedream committed Mar 11, 2019
2 parents 935ca51 + 420ea9f commit aacb181
Show file tree
Hide file tree
Showing 16 changed files with 593 additions and 2,961 deletions.
156 changes: 89 additions & 67 deletions EXOSIMS/Prototypes/SurveySimulation.py

Large diffs are not rendered by default.

27 changes: 24 additions & 3 deletions EXOSIMS/Prototypes/TargetList.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def __init__(self, missionStart=60634, staticStars=True,

#if specs contains a completeness_spec then we are going to generate separate instances
#of planet population and planet physical model for completeness and for the rest of the sim
if specs.has_key('completeness_specs'):
if 'completeness_specs' in specs:
self.PlanetPopulation = get_module(specs['modules']['PlanetPopulation'],'PlanetPopulation')(**specs)
self.PlanetPhysicalModel = self.PlanetPopulation.PlanetPhysicalModel
else:
Expand All @@ -159,9 +159,11 @@ def __init__(self, missionStart=60634, staticStars=True,
# generate any completeness update data needed
self.Completeness.gen_update(self)
self.filter_target_list(**specs)
# have target list, no need for catalog now
if not keepStarCatalog:

# have target list, no need for catalog now (unless asked to retain)
if not self.keepStarCatalog:
self.StarCatalog = specs['modules']['StarCatalog']

# add nStars to outspec
self._outspec['nStars'] = self.nStars

Expand Down Expand Up @@ -766,3 +768,22 @@ def stellarTeff(self, sInds):
Teff = 4600.0*u.K * (1.0/(0.92*self.BV[sInds] + 1.7) + 1.0/(0.92*self.BV[sInds] + 0.62))

return Teff

def dump_catalog(self):
"""Creates a dictionary of stellar properties for archiving use.
Args:
None
Returns:
catalog (dict):
Dictionary of star catalog properties
"""
atts = ['Name', 'Spec', 'parx', 'Umag', 'Bmag', 'Vmag', 'Rmag', 'Imag', 'Jmag', 'Hmag', 'Kmag', 'dist', 'BV', 'MV', 'BC', 'L', 'coords', 'pmra', 'pmdec', 'rv', 'Binary_Cut', 'MsEst', 'MsTrue', 'comp0', 'tint0']
# atts = ['Name', 'Spec', 'parx', 'Umag', 'Bmag', 'Vmag', 'Rmag', 'Imag', 'Jmag', 'Hmag', 'Kmag', 'dist', 'BV', 'MV', 'BC', 'L', 'coords', 'pmra', 'pmdec', 'rv', 'Binary_Cut']
#Not sure if MsTrue and others can be dumped properly...

catalog = {atts[i]: getattr(self,atts[i]) for i in np.arange(len(atts))}

return catalog
51 changes: 46 additions & 5 deletions EXOSIMS/SurveyEnsemble/IPClusterEnsemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import cPickle
import random
import traceback
import subprocess


class IPClusterEnsemble(SurveyEnsemble):
Expand All @@ -34,7 +35,7 @@ def __init__(self, **specs):
self.dview = self.rc[:]
self.dview.block = True
with self.dview.sync_imports(): import EXOSIMS, EXOSIMS.util.get_module, \
os, os.path, time, random, cPickle, traceback
os, os.path, time, random, pickle, traceback, numpy
if specs.has_key('logger'):
specs.pop('logger')
if specs.has_key('seed'):
Expand All @@ -61,7 +62,7 @@ def run_ensemble(self, sim, nb_run_sim, run_one=None, genNewPlanets=True,
sim:
"""

hangingRunsOccured = False # keeps track of whether hanging runs have occured
t1 = time.time()
async_res = []
for j in range(nb_run_sim):
Expand All @@ -71,6 +72,12 @@ def run_ensemble(self, sim, nb_run_sim, run_one=None, genNewPlanets=True,

print("Submitted %d tasks."%len(async_res))

engine_pids = self.rc[:].apply(os.getpid).get_dict()
#ar2 = self.lview.apply_async(os.getpid)
#pids = ar2.get_dict()
print('engine_pids')
print(engine_pids)

runStartTime = time.time()#create job starting time
avg_time_per_run = 0.
tmplenoutstandingset = nb_run_sim
Expand Down Expand Up @@ -98,17 +105,51 @@ def run_ensemble(self, sim, nb_run_sim, run_one=None, genNewPlanets=True,
tmplenoutstandingset = len(outstandingset)#update this. should decrease by ~1 or number of cores...
tLastRunFinished = time.time()#update tLastRunFinished to the last time a simulation finished (right now)
#self.vprint("tmplenoutstandingset %d, tLastRunFinished %0.6f"%(tmplenoutstandingset,tLastRunFinished))
if time.time() - tLastRunFinished > avg_time_per_run*(1 + self.maxNumEngines*2):
if time.time() - tLastRunFinished > avg_time_per_run*(1. + self.maxNumEngines*2.)*4.:
#nb_run_sim = len(self.rc.outstanding)
#restartRuns = True
self.vprint('Aborting ' + str(len(self.rc.outstanding)) + 'qty outstandingset jobs')
self.rc.abort()#by default should abort all outstanding jobs... #it is possible that this will not stop the jobs running
#runningPIDS = os.listdir('/proc') # get all running pids
self.vprint('queue_status')
self.vprint(str(self.rc.queue_status()))
self.rc.abort()
ar.wait(20)
runningPIDS = [int(tpid) for tpid in os.listdir('/proc') if tpid.isdigit()]
#[self.rc.queue_status()[eind] for eind in np.arange(self.maxNumEngines) if self.rc.queue_status()[eind]['tasks']>0]
for engineInd in [eind for eind in np.arange(self.maxNumEngines) if self.rc.queue_status()[eind]['tasks']>0]:
os.kill(engine_pids[engineInd],15)
time.sleep(20)
# for pid in [engine_pids[eind] for eind in np.arange(len(engine_pids))]:
# if pid in runningPIDS:
# os.kill(pid,9) # send kill command to stop this worker
stopIPClusterCommand = subprocess.Popen(['ipcluster','stop'])
stopIPClusterCommand.wait()
time.sleep(60) # doing this instead of waiting for ipcluster to terminate
stopIPClusterCommand = subprocess.Popen(['ipcluster','stop'])
stopIPClusterCommand.wait()
time.sleep(60) # doing this instead of waiting for ipcluster to terminate
hangingRunsOccured = True # keeps track of whether hanging runs have occured
break
#stopIPClusterCommand.wait() # waits for process to terminate
#call(["ipcluster","stop"]) # send command to stop ipcluster
#self.rc.abort(jobs=self.rc.outstanding.copy().pop())
#self.rc.abort()#by default should abort all outstanding jobs... #it is possible that this will not stop the jobs running
#ar.wait(100)
#self.rc.purge_everything() # purge all results if outstanding *because rc.abort() didn't seem to do the job right
tLastRunFinished = time.time()#update tLastRunFinished to the last time a simulation was restarted (right now)

print("%4i/%i tasks finished after %4i s. About %s to go." % (ar.progress, nb_run_sim, ar.elapsed, timeleftstr), end="")
sys.stdout.flush()
#numRunStarts += 1 # increment number of run restarts



t2 = time.time()
print("\nCompleted in %d sec" % (t2 - t1))

res = [ar.get() for ar in async_res]
if hangingRunsOccured: #hanging runs have occured
res = [1]
else:
res = [ar.get() for ar in async_res]

return res

0 comments on commit aacb181

Please sign in to comment.