Skip to content

Commit

Permalink
Merge 74cbc93 into 2de7942
Browse files Browse the repository at this point in the history
  • Loading branch information
sbailey committed Dec 10, 2021
2 parents 2de7942 + 74cbc93 commit 62df241
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 58 deletions.
4 changes: 2 additions & 2 deletions bin/desi_daily_proc_manager
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ def parse_args():#options=None):
"specified, default for daily processing is 'cumulative'. If "+
"'false' or 'None' then no redshifts are submitted")
parser.add_argument("--dry-run-level", type=int, default=0, required=False,
help="If nonzero, this is a simulated run. If dry_run=1 the scripts will be written or submitted. "+
"If dry_run=2, the scripts will not be writter or submitted. Logging will remain the same "+
help="If nonzero, this is a simulated run. If dry_run=1 the scripts will be written but not submitted. "+
"If dry_run=2, the scripts will not be written or submitted. Logging will remain the same "+
"for testing as though scripts are being submitted. Default is 0 (false).")
# File and dir defs
parser.add_argument("-s", "--specprod", type=str, required=False, default=None,
Expand Down
3 changes: 2 additions & 1 deletion py/desispec/desi_proc_dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ def main(args):

if args.output_dir is None:
if 'DESI_DASHBOARD' not in os.environ.keys():
os.environ['DESI_DASHBOARD']=os.environ["HOME"]
os.environ['DESI_DASHBOARD']=os.path.join(
os.environ["DESI_SPECTRO_REDUX"], os.environ["SPECPROD"], "dashboard")
args.output_dir = os.environ["DESI_DASHBOARD"]
else:
os.environ['DESI_DASHBOARD'] = args.output_dir
Expand Down
18 changes: 14 additions & 4 deletions py/desispec/scripts/proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def main(args=None, comm=None):
# args = parse(args)

log = get_logger()
start_time = time.time()

start_mpi_connect = time.time()
if comm is not None:
Expand Down Expand Up @@ -147,7 +148,7 @@ def main(args=None, comm=None):
# jobdesc = 'science'
scriptfile = create_desi_proc_batch_script(night=args.night, exp=args.expid, cameras=args.cameras,
jobdesc=jobdesc, queue=args.queue,
nightlybias=args.nightlybias, runtime=args.runtime,
runtime=args.runtime,
batch_opts=args.batch_opts, timingfile=args.timingfile,
system_name=args.system_name)
err = 0
Expand Down Expand Up @@ -1034,8 +1035,6 @@ def list2str(xx) :

if rank == 0:
stats = desiutil.timer.compute_stats(timers)
log.info('Timing summary statistics:\n' + json.dumps(stats, indent=2))

if args.timingfile:
if os.path.exists(args.timingfile):
with open(args.timingfile) as fx:
Expand All @@ -1052,6 +1051,17 @@ def list2str(xx) :
with open(tmpfile, 'w') as fx:
json.dump(stats, fx, indent=2)
os.rename(tmpfile, args.timingfile)
log.info(f'Timing stats saved to {args.timingfile}')

log.info('Timing max duration per step [seconds]:')
for stepname, steptiming in stats.items():
tmax = steptiming['duration.max']
log.info(f' {stepname:16s} {tmax:.2f}')

if rank == 0:
log.info('All done at {}'.format(time.asctime()))
duration_seconds = time.time() - start_time
mm = int(duration_seconds) // 60
ss = int(duration_seconds - mm*60)

log.info('All done at {}; duration {}m{}s'.format(
time.asctime(), mm, ss))
76 changes: 38 additions & 38 deletions py/desispec/scripts/proc_joint_fit.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import subprocess
from copy import deepcopy
import json
import glob

import numpy as np
import fitsio
from astropy.io import fits
import glob

import desiutil.timer
import desispec.io
from desispec.io import findfile, replace_prefix
Expand Down Expand Up @@ -176,45 +177,44 @@ def main(args=None, comm=None):
if args.obstype in ['ARC']:
timer.start('psfnight')
num_cmd = num_err = 0
if rank == 0:
for camera in args.cameras:
psfnightfile = findfile('psfnight', args.night, args.expids[0], camera)
if not os.path.isfile(psfnightfile): # we still don't have a psf night, see if we can compute it ...
psfs = list()
for expid in args.expids:
psffile = findfile('fitpsf', args.night, expid, camera)
if os.path.exists(psffile):
psfs.append( psffile )
else:
log.warning(f'Missing {psffile}')

log.info("Number of PSF for night={} camera={} = {}/{}".format(
args.night, camera, len(psfs), len(args.expids)))
if len(psfs) >= 3: # lets do it!
log.info(f"Computing {camera} psfnight ...")
dirname = os.path.dirname(psfnightfile)
if not os.path.isdir(dirname):
os.makedirs(dirname)
num_cmd += 1

#- generic try/except so that any failure doesn't leave
#- MPI rank 0 hanging while others are waiting for it
try:
desispec.scripts.specex.mean_psf(psfs, psfnightfile)
except:
log.error('specex.meanpsf failed for {}'.format(os.path.basename(psfnightfile)))
exc_type, exc_value, exc_traceback = sys.exc_info()
lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
log.error(''.join(lines))
sys.stdout.flush()

if not os.path.exists(psfnightfile):
log.error(f'Failed to create {psfnightfile}')
num_err += 1
for camera in args.cameras[rank::size]:
psfnightfile = findfile('psfnight', args.night, args.expids[0], camera)
if not os.path.isfile(psfnightfile): # we still don't have a psf night, see if we can compute it ...
psfs = list()
for expid in args.expids:
psffile = findfile('fitpsf', args.night, expid, camera)
if os.path.exists(psffile):
psfs.append( psffile )
else:
log.info(f"Fewer than 3 {camera} psfs were provided, can't compute psfnight. Exiting ...")
num_cmd += 1
log.warning(f'Missing {psffile}')

log.info("Number of PSF for night={} camera={} = {}/{}".format(
args.night, camera, len(psfs), len(args.expids)))
if len(psfs) >= 3: # lets do it!
log.info(f"Rank {rank} computing {camera} psfnight ...")
dirname = os.path.dirname(psfnightfile)
if not os.path.isdir(dirname):
os.makedirs(dirname)
num_cmd += 1

#- generic try/except so that any failure doesn't leave
#- MPI rank 0 hanging while others are waiting for it
try:
desispec.scripts.specex.mean_psf(psfs, psfnightfile)
except:
log.error('Rank {} specex.meanpsf failed for {}'.format(rank, os.path.basename(psfnightfile)))
exc_type, exc_value, exc_traceback = sys.exc_info()
lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
log.error(''.join(lines))
sys.stdout.flush()

if not os.path.exists(psfnightfile):
log.error(f'Rank {rank} failed to create {psfnightfile}')
num_err += 1
else:
log.info(f"Fewer than 3 {camera} psfs were provided, can't compute psfnight. Exiting ...")
num_cmd += 1
num_err += 1

timer.stop('psfnight')

Expand Down
4 changes: 3 additions & 1 deletion py/desispec/scripts/specex.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def main(args, comm=None):

com.extend(optarray)

log.debug("proc {} calling {}".format(rank, " ".join(com)))
log.info("proc {} calling {}".format(rank, " ".join(com)))

retval = run_specex(com)

Expand All @@ -194,6 +194,8 @@ def main(args, comm=None):
log.error("desi_psf_fit on process {} failed with return "
"value {} running {}".format(rank, retval, comstr))
failcount += 1
else:
log.info(f"proc {rank} succeeded generating {outbundlefits}")

if comm is not None:
from mpi4py import MPI
Expand Down
10 changes: 5 additions & 5 deletions py/desispec/scripts/tile_redshifts.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ def write_redshift_script(batchscript, outdir,
fi
if [ $NUM_CFRAMES -gt 0 ]; then
echo Grouping $NUM_CFRAMES cframes into $(basename $spectra), see $splog
cmd="srun -N 1 -n 1 -c {threads_per_node} desi_group_spectra --inframes $CFRAMES --outfile $spectra"
cmd="srun -N 1 -n 1 -c {threads_per_node} --cpu-bind=none desi_group_spectra --inframes $CFRAMES --outfile $spectra"
echo RUNNING $cmd &> $splog
$cmd &>> $splog &
sleep 0.5
Expand All @@ -391,7 +391,7 @@ def write_redshift_script(batchscript, outdir,
echo $(basename $coadd) already exists, skipping coadd
elif [ -f $spectra ]; then
echo Coadding $(basename $spectra) into $(basename $coadd), see $colog
cmd="srun -N 1 -n 1 -c {threads_per_node} desi_coadd_spectra {onetileopt} --nproc 16 -i $spectra -o $coadd"
cmd="srun -N 1 -n 1 -c {threads_per_node} --cpu-bind=none desi_coadd_spectra {onetileopt} --nproc 16 -i $spectra -o $coadd"
echo RUNNING $cmd &> $colog
$cmd &>> $colog &
sleep 0.5
Expand All @@ -418,7 +418,7 @@ def write_redshift_script(batchscript, outdir,
echo $(basename $redrock) already exists, skipping redshifts
elif [ -f $coadd ]; then
echo Running redrock on $(basename $coadd), see $rrlog
cmd="srun -N {redrock_nodes} -n {cores_per_node*redrock_nodes//redrock_cores_per_rank} -c {threads_per_core*redrock_cores_per_rank} rrdesi_mpi -i $coadd -o $redrock -d $rrdetails"
cmd="srun -N {redrock_nodes} -n {cores_per_node*redrock_nodes//redrock_cores_per_rank} -c {threads_per_core*redrock_cores_per_rank} --cpu-bind=cores rrdesi_mpi -i $coadd -o $redrock -d $rrdetails"
echo RUNNING $cmd &> $rrlog
$cmd &>> $rrlog &
sleep 0.5
Expand Down Expand Up @@ -486,7 +486,7 @@ def write_redshift_script(batchscript, outdir,
echo $(basename $qsomgii) already exists, skipping QSO MgII afterburner
elif [ -f $redrock ]; then
echo Running QSO MgII afterburner, see $qsomgiilog
cmd="srun -N 1 -n 1 -c {threads_per_node} desi_qso_mgii_afterburner --coadd $coadd --redrock $redrock --output $qsomgii --target_selection all --save_target all"
cmd="srun -N 1 -n 1 -c {threads_per_node} --cpu-bind=none desi_qso_mgii_afterburner --coadd $coadd --redrock $redrock --output $qsomgii --target_selection all --save_target all"
echo RUNNING $cmd &> $qsomgiilog
$cmd &>> $qsomgiilog &
sleep 0.5
Expand All @@ -499,7 +499,7 @@ def write_redshift_script(batchscript, outdir,
echo $(basename $qsoqn) already exists, skipping QSO QuasarNet afterburner
elif [ -f $redrock ]; then
echo Running QSO QuasarNet afterburner, see $qsoqnlog
cmd="srun -N 1 -n 1 -c {threads_per_node} desi_qso_qn_afterburner --coadd $coadd --redrock $redrock --output $qsoqn --target_selection all --save_target all"
cmd="srun -N 1 -n 1 -c {threads_per_node} --cpu-bind=none desi_qso_qn_afterburner --coadd $coadd --redrock $redrock --output $qsoqn --target_selection all --save_target all"
echo RUNNING $cmd &> $qsoqnlog
$cmd &>> $qsoqnlog &
sleep 0.5
Expand Down
25 changes: 18 additions & 7 deletions py/desispec/workflow/desi_proc_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,9 @@ def determine_resources(ncameras, jobdesc, queue, nexps=1, forced_runtime=None,
elif jobdesc in ('ZERO'):
ncores, runtime = 2, 5
elif jobdesc == 'PSFNIGHT':
ncores, runtime = 20 * nspectro, 5 #ncameras, 5
ncores, runtime = ncameras, 5
elif jobdesc == 'NIGHTLYFLAT':
ncores, runtime = 20 * nspectro, 20 #ncameras, 5
ncores, runtime = ncameras, 5
elif jobdesc in ('STDSTARFIT'):
ncores, runtime = 20 * ncameras, (6+2*nexps) #ncameras, 10
else:
Expand Down Expand Up @@ -406,7 +406,7 @@ def get_desi_proc_batch_file_pathname(night, exp, jobdesc, cameras, reduxdir=Non
return os.path.join(path, name)


def create_desi_proc_batch_script(night, exp, cameras, jobdesc, queue, nightlybias=False, runtime=None, batch_opts=None,\
def create_desi_proc_batch_script(night, exp, cameras, jobdesc, queue, runtime=None, batch_opts=None,\
timingfile=None, batchdir=None, jobname=None, cmdline=None, system_name=None):
"""
Generate a SLURM batch script to be submitted to the slurm scheduler to run desi_proc.
Expand All @@ -424,7 +424,6 @@ def create_desi_proc_batch_script(night, exp, cameras, jobdesc, queue, nightlybi
queue: str. Queue to be used.
Options:
nightlybias: bool. Generate nightly bias from N>>1 ZEROs
runtime: str. Timeout wall clock time.
batch_opts: str. Other options to give to the slurm batch scheduler (written into the script).
timingfile: str. Specify the name of the timing file.
Expand Down Expand Up @@ -472,6 +471,14 @@ def create_desi_proc_batch_script(night, exp, cameras, jobdesc, queue, nightlybi
ncores, nodes, runtime = determine_resources(ncameras, jobdesc.upper(), queue=queue, nexps=nexps,
forced_runtime=runtime, system_name=system_name)

#- derive from cmdline or sys.argv whether this is a nightlybias job
nightlybias = False
if cmdline is not None:
if '--nightlybias' in cmdline:
nightlybias = True
elif '--nightlybias' in sys.argv:
nightlybias = True

#- nightlybias jobs are memory limited, so throttle number of ranks
if nightlybias:
ncores = min(ncores, 8)
Expand Down Expand Up @@ -546,7 +553,10 @@ def create_desi_proc_batch_script(night, exp, cameras, jobdesc, queue, nightlybi

fx.write('echo Starting at $(date)\n')

fx.write("export OMP_NUM_THREADS={}\n".format(threads_per_core))
if jobdesc.lower() == 'arc':
fx.write("export OMP_NUM_THREADS={}\n".format(threads_per_core))
else:
fx.write("export OMP_NUM_THREADS=1\n")

if jobdesc.lower() not in ['science', 'prestdstar', 'stdstarfit', 'poststdstar']:
if nightlybias:
Expand All @@ -560,7 +570,7 @@ def create_desi_proc_batch_script(night, exp, cameras, jobdesc, queue, nightlybi
else:
if jobdesc.lower() in ['science','prestdstar']:
fx.write('\n# Do steps through skysub at full MPI parallelism\n')
srun = f'srun -N {nodes} -n {ncores} -c {threads_per_core} {cmd} --nofluxcalib'
srun = f'srun -N {nodes} -n {ncores} -c {threads_per_core} --cpu-bind=cores {cmd} --nofluxcalib'
fx.write('echo Running {}\n'.format(srun))
fx.write('{}\n'.format(srun))
if jobdesc.lower() in ['science', 'stdstarfit', 'poststdstar']:
Expand All @@ -575,7 +585,8 @@ def create_desi_proc_batch_script(night, exp, cameras, jobdesc, queue, nightlybi
threads_per_task = max(int(tot_threads / ntasks), 1)
fx.write('\n# Use less MPI parallelism for fluxcalib MP parallelism\n')
fx.write('# This should quickly skip over the steps already done\n')
srun = f'srun -N {nodes} -n {ntasks} -c {threads_per_task} {cmd} '
#- fluxcalib multiprocessing parallelism needs --cpu-bind=none (or at least not "cores")
srun = f'srun -N {nodes} -n {ntasks} -c {threads_per_task} --cpu-bind=none {cmd} '
fx.write('if [ $? -eq 0 ]; then\n')
fx.write(' echo Running {}\n'.format(srun))
fx.write(' {}\n'.format(srun))
Expand Down

0 comments on commit 62df241

Please sign in to comment.