diff --git a/bin/copyprod b/bin/copyprod index 1eaccf0f5..e1ccf4f33 100755 --- a/bin/copyprod +++ b/bin/copyprod @@ -1,39 +1,153 @@ #!/usr/bin/env python -# See top-level LICENSE.rst file for Copyright information """ -Copy a DESI production from one directory to another - -Currently this only copies the directory tree and the extracted frame files. -This script will be expanded to support copying through any level of -processing, including the metadata of how we got to that processing step. +Create a nested directory tree with file symlinks to "copy" one prod to another Stephen Bailey -April 2015 +March 2021 """ -from __future__ import absolute_import, division, print_function -import os +import os, sys, glob import shutil +from collections import Counter import optparse +import numpy as np +from astropy.table import Table + +from desiutil.log import get_logger +from desispec.workflow.tableio import load_table, write_table + parser = optparse.OptionParser(usage = "%prog [options] indir outdir") -# parser.add_option("-i", "--input", type="string", help="input data") -# parser.add_option("-x", "--xxx", help="some flag", action="store_true") -opts, args = parser.parse_args() +parser.add_option("--explist", help="file with NIGHT EXPID to copy") +parser.add_option("--fullcopy", action="store_true", + help="Copy files instead of linking") +parser.add_option("--abspath", action="store_true", + help="Link to absolute path instead of relative path") +parser.add_option("--filter-exptables", action="store_true", + help="Filter exposure_tables to ignore entries not in input explist") +opts, args = parser.parse_args() inroot, outroot = args +log = get_logger() + +#- Get list of NIGHT EXPID to copy +if opts.explist is not None: + explist = Table.read(opts.explist, format='ascii') +else: + rows = list() + for dirname in sorted(glob.glob(f'{inroot}/exposures/20??????/????????')): + nightdir, expid = os.path.split(dirname) + try: + night = int(os.path.basename(nightdir)) + expid = int(expid) + rows.append((night, expid)) + except ValueError: + pass + + explist = Table(rows=rows, names=('NIGHT', 'EXPID')) + +num_nights = len(np.unique(explist['NIGHT'])) +num_expids = len(np.unique(explist['EXPID'])) +assert num_expids == len(explist) +log.info(f'Linking {num_expids} exposures on {num_nights} nights') + +#- What type of copy or link are we making? +if opts.fullcopy: + link = shutil.copy2 + log.debug('Performing full copy of selected NIGHT/EXPID') +else: + link = os.symlink + log.debug('Creating symlinks') + +inroot = os.path.abspath(inroot) +outroot = os.path.abspath(outroot) +log.info(f'Creating links in {outroot} to files in {inroot}') if not os.path.exists(outroot): os.makedirs(outroot) -#- Copy exposures -for indir, subdirs, filenames in os.walk(inroot+'/exposures'): - outdir = indir.replace(inroot, outroot) - print(outdir) - if not os.path.exists(outdir): - os.makedirs(outdir) +def link_dirfiles(indir, outdir, abspath=False): + """ + Create relative links in outdir to all files in indir + """ + os.makedirs(outdir, exist_ok=True) + if abspath: + srcpath = indir + else: + srcpath = os.path.relpath(indir, outdir) + + for infile in sorted(glob.glob(f'{indir}/*.*')): + basename = os.path.basename(infile) + src = os.path.join(srcpath, basename) + dst = os.path.join(outdir, basename) + link(src, dst) + +#- calibnight: link all files for requested nights +for night in np.unique(explist['NIGHT']): + log.info(f'calibnight/{night}') + indir = f'{inroot}/calibnight/{night}' + outdir = f'{outroot}/calibnight/{night}' + link_dirfiles(indir, outdir, opts.abspath) + +#- preproc, exposures: link all files for requested night/expid +for subdir in ['preproc', 'exposures']: + for night, expid in explist['NIGHT', 'EXPID']: + log.info(f'{subdir}/{night}/{expid:08d}') + indir = f'{inroot}/{subdir}/{night}/{expid:08d}' + outdir = f'{outroot}/{subdir}/{night}/{expid:08d}' + link_dirfiles(indir, outdir, opts.abspath) + +#- exposure tables: trim to requested exposures +for night in np.unique(explist['NIGHT']): + yearmm = night//100 + infile = f'{inroot}/exposure_tables/{yearmm}/exposure_table_{night}.csv' + outfile = f'{outroot}/exposure_tables/{yearmm}/exposure_table_{night}.csv' + os.makedirs(os.path.dirname(outfile), exist_ok=True) + if opts.filter_exptables: + exptab = load_table(infile, tabletype='exptable') + ignore = np.in1d(exptab['EXPID'], explist['EXPID'], invert=True) + if np.any(ignore): + msg = f'Night {night} ignoring ' + msg += ', '.join([f'{n} {obstype}' for obstype, n in Counter(exptab['OBSTYPE'][ignore]).items()]) + msg += ' exposures not included in explist' + log.warning(msg) + + exptab['LASTSTEP'][ignore] = 'ignore' + write_table(exptab, outfile, tabletype='exptable') + else: + log.info(f'Night {night} copying exposure tables unchanged') + shutil.copy2(infile, outfile) + + +#- processing tables: trim to requested exposures. +#- Disabled for now while we figure out exactly what we want. +# for night in np.unique(explist['NIGHT']): +# night_expids = explist['EXPID'][explist['NIGHT'] == night] +# tmp = glob.glob(f'{inroot}/processing_tables/processing_table_*-{night}.csv') +# if len(tmp) == 1: +# infile = tmp[0] +# elif len(tmp) == 0: +# log.error(f'Unable to find processing table for {night}') +# continue +# elif len(tmp) == 0: +# log.error(f'Multiple processing tables for {night} ?!?') +# continue +# +# outfile = '{}/processing_tables/{}'.format( +# outroot, os.path.basename(infile)) +# proctab = load_table(infile, tabletype='proctable') +# +# #- Convert string "expid1|expid2" entries into keep or not +# keep = np.zeros(len(proctab), dtype=bool) +# for i, expids in enumerate(proctab['EXPID']): +# if np.any(np.in1d(expids, night_expids)): +# keep[i] = True +# +# proctab = proctab[keep] +# +# os.makedirs(os.path.dirname(outfile), exist_ok=True) +# write_table(proctab, outfile, tabletype='proctable') + +log.info(f'Production copied into {outroot}') - for name in filenames: - if name.startswith('frame-'): - shutil.copy2(indir+'/'+name, outdir+'/'+name) diff --git a/bin/desi_average_flux_calibration b/bin/desi_average_flux_calibration index 0313f8f3c..59abb3448 100755 --- a/bin/desi_average_flux_calibration +++ b/bin/desi_average_flux_calibration @@ -68,6 +68,24 @@ if __name__ == '__main__': seeing.append(float(header["seeing"])) cal=read_flux_calibration(filename) + if np.any(np.isnan(cal.calib)) : + log.error("calib has nan") + continue + + mcalib = np.median(cal.calib,axis=0) + + # undo the heliocentric/barycentric correction + #- check for heliocentric correction + if 'HELIOCOR' in header.keys() : + heliocor=header['HELIOCOR'] + log.info("Undo the heliocentric correction scale factor {} of the calib".format(heliocor)) + # wavelength are in solar system frame + # first divide the multiplicative factor to have wavelength in KPNO frame + wave_in_kpno_system = cal.wave/heliocor + + # now we want the wave grid cal.wave to be in the KPNO frame, + # we have to resample the calib vector + mcalib = np.interp(cal.wave,wave_in_kpno_system,mcalib) if wave is None : wave=cal.wave @@ -77,10 +95,6 @@ if __name__ == '__main__': if exptime <= 2. : # arbitrary cutoff print("skip exptime=",exptime) continue - if np.any(np.isnan(cal.calib)) : - print("ERROR calib has nan") - continue - mcalib = np.median(cal.calib,axis=0) calibs.append(mcalib/exptime) diff --git a/bin/desi_merge_zbest_tiles b/bin/desi_merge_zbest_tiles new file mode 100755 index 000000000..95c5711a3 --- /dev/null +++ b/bin/desi_merge_zbest_tiles @@ -0,0 +1,143 @@ +#!/usr/bin/env python + +""" +Make summary redshift catalogs from tile-based redshifts, e.g. blanc +""" + +import os, sys, glob, collections +import argparse +import numpy as np +from numpy.lib.recfunctions import rec_append_fields, join_by +import fitsio +from astropy.table import Table, vstack +from desiutil.log import get_logger +from desiutil import depend + +parser = argparse.ArgumentParser(usage = "{prog} [options]") +parser.add_argument("--group", type=str, required=True, + help="exposure grouping, e.g. 'all' or 'deep'") +parser.add_argument("-o", "--output", type=str, required=True, + help="output catalog filename") +parser.add_argument("--reduxdir", type=str, + help="overrides $DESI_SPECTRO_REDUX/$SPECPROD") +parser.add_argument("--first-tile", type=int, + help="First TILEID to include") +parser.add_argument("--last-tile", type=int, + help="Last TILEID to include (inclusive, not python indexing style)") +parser.add_argument("--tiles", type=int, nargs="*", + help="TILEIDs to include (space separated)") +parser.add_argument("--no-fibermap", action="store_true", help="Do not merge with fibermap") +parser.add_argument("--no-scores", action="store_true", help="Do not merge with coadd scores") +# parser.add_argument("-v", "--verbose", action="store_true", help="some flag") + +args = parser.parse_args() +log = get_logger() + +if args.reduxdir is None: + args.reduxdir = os.path.expandvars('$DESI_SPECTRO_REDUX/$SPECPROD') + +tiledir = f'{args.reduxdir}/tiles' +log.info(f'Looking for per-tile zbest files in {tiledir}/TILEID/{args.group}/') +assert os.path.isdir(tiledir) + +zbestfiles = list() +for filename in sorted(glob.glob(f'{tiledir}/*/{args.group}/zbest-*.fits')): + tileid = int(os.path.basename(os.path.dirname(os.path.dirname(filename)))) + if args.first_tile is not None and tileid < args.first_tile: + continue + if args.last_tile is not None and tileid > args.last_tile: + continue + if args.tiles is not None and tileid not in args.tiles: + continue + + zbestfiles.append(filename) + +nzbfiles = len(zbestfiles) +zbx = list() + +for i, filename in enumerate(zbestfiles): + print(f'{i+1}/{nzbfiles}: {filename}') + zb = fitsio.read(filename, 'ZBEST') + + tmp = os.path.basename(filename).replace('zbest-', 'coadd-', 1) + coaddfile = os.path.join(os.path.dirname(filename), tmp) + if not args.no_fibermap: + fm = fitsio.read(coaddfile, 'FIBERMAP') + #- Sorted the same + assert np.all(zb['TARGETID'] == fm['TARGETID']) + #- TARGETID is only column in common + assert (set(zb.dtype.names) & set(fm.dtype.names)) == set(['TARGETID',]) + zb = join_by('TARGETID', zb, fm) + else: + tileid = np.ones(len(zb), dtype=np.int16)*tileid + zb = rec_append_fields(zb, 'TILEID', tileid) + + if not args.no_scores: + scores = fitsio.read(coaddfile, 'SCORES') + #- Sorted the same + assert np.all(zb['TARGETID'] == scores['TARGETID']) + #- TARGETID is only column in common + assert (set(zb.dtype.names) & set(scores.dtype.names)) == set(['TARGETID',]) + zb = join_by('TARGETID', zb, scores) + + #- Handle a few dtype special cases + zb = Table(zb) + if 'NUMOBS_MORE' in zb.colnames and zb['NUMOBS_MORE'].dtype != np.dtype('>i4'): + zb['NUMOBS_MORE'] = zb['NUMOBS_MORE'].astype('>i4') + if 'RELEASE' in zb.colnames and zb['RELEASE'].dtype != np.dtype('>i2'): + zb['RELEASE'] = zb['RELEASE'].astype('>i2') + + zbx.append(zb) + +#- Determine union and intersection of columns present in the files +#- since the fiberassign datamodel evolved for exactly which targeting +#- columns were provided +all_columns = set() +joint_columns = None +for zb in zbx: + all_columns.update(zb.colnames) + if joint_columns is None: + joint_columns = set(zb.colnames) + else: + joint_columns &= set(zb.colnames) + +#- Add *_TARGET columns if needed, and drop other columns that aren't in common +add_columns = set() +drop_columns = set() +for colname in (all_columns - joint_columns): + if colname.endswith('_TARGET'): + add_columns.add(colname) + else: + drop_columns.add(colname) + +#- update individual tables to have the same columns +for zb in zbx: + for colname in add_columns: + if colname not in zb.colnames: + zb[colname] = np.zeros(len(zb), dtype=int) + for colname in drop_columns: + if colname in list(zb.colnames): + zb.remove_column(colname) + +#- Standardize column order to match last tile +columns = zbx[-1].colnames +for i in range(len(zbx)): + zb = zbx[i] + if zb.columns != columns: + zbx[i] = zb[columns] #- reorders columns + +#- Finally! make the stacks redshift catalog +zcat = vstack(zbx) + +#- Add record of inputs +zcat.meta['TILEDIR'] = os.path.normpath(tiledir) +for i, filename in enumerate(zbestfiles): + key = f'IN{i:06d}' + zcat.meta[key] = filename.replace(tiledir, 'TILEDIR') + +depend.add_dependencies(zcat.meta) +zcat.meta['SPECPROD'] = os.path.basename(os.path.normpath(args.reduxdir)) +zcat.meta['EXTNAME'] = 'ZCATALOG' +zcat.write(args.output) + + diff --git a/bin/desi_run_night b/bin/desi_run_night index 5164a4292..30a9ba2af 100755 --- a/bin/desi_run_night +++ b/bin/desi_run_night @@ -21,6 +21,8 @@ def parse_args(): # options=None): # "Overwrites the environment variable SPECPROD") parser.add_argument("-q", "--queue", type=str, required=False, default='realtime', help="The queue to submit jobs to. Default is realtime.") + parser.add_argument("-r", "--reservation", type=str, required=False, default=None, + help="The reservation to submit jobs to. If None, it is not submitted to a reservation.") parser.add_argument("--exp-table-path", type=str, required=False, default=None, help="Directory name where the output exposure table should be saved.") parser.add_argument("--proc-table-path", type=str, required=False, default=None, @@ -33,6 +35,8 @@ def parse_args(): # options=None): parser.add_argument("--error-if-not-available", action="store_true", help="Raise an error instead of reporting and moving on if an exposure "+\ "table doesn't exist.") + parser.add_argument("--overwrite-existing", action="store_true", + help="Give this flag if you want to submit jobs even if scripts already exist.") args = parser.parse_args() diff --git a/bin/desi_tsnr_afterburner b/bin/desi_tsnr_afterburner index 3e51c46be..9f6c51b33 100755 --- a/bin/desi_tsnr_afterburner +++ b/bin/desi_tsnr_afterburner @@ -3,12 +3,16 @@ Calculate stand alone tsnr tables for a given desispec prod. ''' -import os +import os,sys import glob import itertools import argparse import astropy.io.fits as fits +import fitsio import numpy as np +import multiprocessing +import yaml +from pkg_resources import resource_filename from desispec.io import read_sky from desispec.io import read_fiberflat @@ -26,18 +30,28 @@ from desiutil.depend import getdep def parse(options=None): parser = argparse.ArgumentParser( description="Calculate template S/N ratio for exposures") - parser.add_argument('--outfile', type=str, default=None, required=False, + parser.add_argument('-o','--outfile', type=str, default=None, required=False, help = 'Output summary file') - parser.add_argument('--details-dir', type=str, required=False, + parser.add_argument('--update', action = 'store_true', + help = 'Update pre-existing output summary file (replace or append)') + parser.add_argument('--details-dir', type=str, default = None, required=False, help = 'Dir. to write per-exposure per-camera files with per-fiber tSNR details') parser.add_argument('--prod', type = str, default = None, required=False, - help = 'Path to input reduction, e.g. /global/cfs/cdirs/desi/spectro/redux/blanc/') + help = 'Path to input reduction, e.g. /global/cfs/cdirs/desi/spectro/redux/blanc/, or simply prod version, like blanc, but requires env. variable DESI_SPECTRO_REDUX. Default is $DESI_SPECTRO_REDUX/$SPECPROD.') parser.add_argument('--cameras', type = str, default = None, required=False, help = 'Cameras to reduce (comma separated)') parser.add_argument('--expids', type = str, default = None, required=False, help = 'Comma separated list of exp ids to process') - parser.add_argument('--night', type = int, default = None, required=False, - help = 'Restrict to this night only') + parser.add_argument('--nights', type = str, default = None, required=False, + help = 'Comma separated list of nights to process') + parser.add_argument('--recompute', action='store_true', + help = 'Recompute TSNR values') + parser.add_argument('--alpha_only', action='store_true', + help = 'Only compute the alpha for tsnr.') + parser.add_argument('--nproc', type = int, default = 1, + help = 'Multiprocessing.') + parser.add_argument('--efftime-config', type = str, default = None, required=False, + help = 'Use this config file instead of default data/tsnr/tsnr-efftime.yaml') args = None if options is None: args = parser.parse_args() @@ -46,153 +60,100 @@ def parse(options=None): return args -def main(): - log = get_logger() - args=parse() - if args.prod is None: - args.prod = specprod_root() +def compute_tsnr_values(cframe_filename,cframe_hdulist,night,expid,camera,specprod_dir, alpha_only=False) : + """ + Computes TSNR values + Args: + cframe_filename: str, cframe file path + cframe_hdulist: astropy.fits.HDUlist object + night: int + expid: int + camera: str + specprod_dir: str, production directory + alpha_only: bool, set to True to only recompute alpha - log.info('outfile = {}'.format(args.outfile)) - log.info('details-dir = {}'.format(args.details_dir)) - log.info('prod = {}'.format(args.prod)) - log.info('cameras = {}'.format(args.cameras)) - log.info('expids = {}'.format(args.expids)) + Returns: astropy.table.Table obkect with TSNR values + """ - petals = np.arange(10).astype(str) + calib = findfile('fluxcalib', night=night, expid=expid, + camera=camera, specprod_dir=specprod_dir) + flat = cframe_hdulist[0].header['FIBERFLT'] - if args.cameras is None: - cameras = [x[0] + x[1] for x in itertools.product(['b', 'r', 'z'], petals.astype(np.str))] - else: - cameras = args.cameras.split(',') - if args.expids is not None: - expids = [np.int(x) for x in args.expids.split(',')] + if 'SPECPROD' in flat: + flat = flat.replace('SPECPROD', specprod_dir) + elif 'SPCALIB' in flat: + hdr = fitsio.read_header(cframe_filename) + flat = flat.replace('SPCALIB', getdep(hdr, 'DESI_SPECTRO_CALIB')) else: - expids = None - - if args.night is None: - args.night = '*' - - cframes = {} - for cam in cameras: - cframes[cam] = sorted(glob.glob('{}/exposures/{}/*/cframe-{}-*.fits'.format(args.prod, args.night, cam))) - - sci_frames = {} - - for cam in cameras: - sci_frames[cam] = [] - - for cframe in cframes[cam]: - hdul = fits.open(cframe) - hdr = hdul[0].header - - flavor = hdr['FLAVOR'] - prog = hdr['PROGRAM'] - expid = hdr['EXPID'] - - if expids is not None: - if expid not in expids: - continue - - if flavor == 'science': - sci_frames[cam].append(cframe) - - hdul.close() - - log.info('{} science frames to reduce for {}.'.format(len(sci_frames[cam]), cam)) + raise ValueError('Failed on flat retrieval for {}.'.format(hdr)) + + iin = cframe_filename.replace('cframe', 'frame') + sky = cframe_filename.replace('cframe', 'sky') + psf = cframe_filename.replace('cframe', 'psf') + + frame=read_frame(iin, skip_resolution=True) + fiberflat=read_fiberflat(flat) + fluxcalib=read_flux_calibration(calib) + skymodel=read_sky(sky) + + results, alpha = calc_tsnr2(frame, fiberflat=fiberflat, + skymodel=skymodel, fluxcalib=fluxcalib, alpha_only=alpha_only) + + table=Table() + for k in results: + table[k] = results[k].astype(np.float32) + table["TSNR2_ALPHA_"+camera[0].upper()] = np.repeat(alpha,len(frame.flux)) + + return table - summary_rows = list() - for cam in cameras: - for kk, x in enumerate(sci_frames[cam]): - cframe = fits.open(x) - hdr = cframe[0].header - if hdr['PROGRAM'].startswith('SV1'): - night = hdr['NIGHT'] - expid = hdr['EXPID'] - camera = hdr['CAMERA'] - tileid = hdr['TILEID'] - - calib = findfile('fluxcalib', night=night, expid=expid, - camera=camera, specprod_dir=args.prod) - - flat = cframe[0].header['FIBERFLT'] - - if 'SPECPROD' in flat: - flat = flat.replace('SPECPROD', args.prod) - - elif 'SPCALIB' in flat: - flat = flat.replace('SPCALIB', getdep(hdr, 'DESI_SPECTRO_CALIB')) - - else: - raise ValueError('Failed on flat retrieval for {}.'.format(hdr)) - - iin = x.replace('cframe', 'frame') - sky = x.replace('cframe', 'sky') - psf = sky.replace('sky', 'psf') - - if args.details_dir is not None: - out = f'{args.details_dir}/{night}/{expid:08d}/tsnr-{camera}-{expid:08d}.fits' - else: - out = None - - if out is not None and os.path.exists(out): - log.info(f'Using previously generated {out}') - table = Table.read(out) - else: - frame=read_frame(iin, skip_resolution=True) - fiberflat=read_fiberflat(flat) - fluxcalib=read_flux_calibration(calib) - skymodel=read_sky(sky) - - results, alpha = calc_tsnr2(frame, fiberflat=fiberflat, - skymodel=skymodel, fluxcalib=fluxcalib) - - table=Table() - for k in results: - table[k] = results[k].astype(np.float32) - - table['FIBER'] = frame.fibermap['FIBER'] - table['TARGETID'] = frame.fibermap['TARGETID'] - - table.meta['NIGHT'] = night - table.meta['EXPID'] = expid - table.meta['TILEID'] = tileid - table.meta['CAMERA'] = camera - table.meta['TSNRALPH'] = alpha - table.meta['EXTNAME'] = 'TSNR2' - - #- Write per-expid per-camera output if requested - if out is not None: - Path(os.path.dirname(out)).mkdir(parents=True,exist_ok=True) - tmpfile = out+'.tmp' - table.write(tmpfile, format='fits', overwrite=True) - os.rename(tmpfile, out) - log.info('Successfully wrote {}.'.format(out)) - - # Append to summary. - entry = dict() - entry['NIGHT'] = np.int32(night) - entry['EXPID'] = np.int32(expid) - entry['TILEID'] = np.int32(tileid) - entry['CAMERA'] = camera - for key in table.colnames: - if key.startswith('TSNR2_'): - #- TSNR2_{TRACER}_{BAND} -> TSNR2_{TRACER} - short_key = '_'.join(key.split('_')[0:2]) - entry[short_key]=np.median(table[key]).astype(np.float32) - - entry['ALPHA'] = table.meta['TSNRALPH'] - - summary_rows.append(entry) - - log.info('{} {:08d} {}: Reduced {} of {}.'.format( - night, expid, cam, kk, len(sci_frames[cam]))) - - cframe.close() - - #- If not writing a summary file, we're done - if args.outfile is None: - return + +def update_table(table1,table2,keys) : + """ Replace or append rows of table1 with content of table2 indexed by keys + + Args: + table1: astropy.table.Table + table2: astropy.table.Table + keys: list of str + + Returns astropy.table.Table + """ + + log = get_logger() + + v1=table1[keys[0]] + v2=table2[keys[0]] + if len(keys)>1 : # I don't know how of a better generic way to create a joined index + v1=v1.astype("str") + v2=v2.astype("str") + for k in keys[1:] : + v1=[i+j for i,j in zip(v1,table1[k].astype(str))] + v2=[i+j for i,j in zip(v2,table2[k].astype(str))] + + replace = np.in1d(v1,v2) + keep = ~replace + + log.info("keep {} entries, replace {}, add {}".format(np.sum(keep),np.sum(replace),len(table2)-np.sum(replace))) + + if np.sum(keep)>0 : + return vstack([table1[keep],table2]) + else : + return table2 + + +def write_summary(summary_rows,output_filename,efftime_config,preexisting_tsnr2_expid_table,preexisting_tsnr2_frame_table) : + """ Writes summar file. + + Args: + summary_rows: list of dictionnaries + output_filename: str, output filename + efftime_config: dictionnary with scaling factors from TSNR2 to effective exp. time + preexisting_tsnr2_expid_table: None or astropy.table.Table, update this table if not None + preexisting_tsnr2_frame_table: None or astropy.table.Table, update this table if not None + + Returns nothing + """ + log = get_logger() #- Create camera summary table; specify names to preserve column order colnames = list(summary_rows[0].keys()) @@ -211,15 +172,16 @@ def main(): #- Distill into exposure summary per-petal rows = list() - for night, expid, tileid in sorted(set(zip( + for night, expid, tileid, exptime in sorted(set(zip( cam_summary['NIGHT'], cam_summary['EXPID'], - cam_summary['TILEID'] + cam_summary['TILEID'], + cam_summary['EXPTIME'] ))): #- TILEID and NIGHT are unique to EXPID, so only filter on EXPID ii = (cam_summary['EXPID'] == expid) - row = dict(NIGHT=night, EXPID=expid, TILEID=tileid) + row = dict(NIGHT=night, EXPID=expid, TILEID=tileid, EXPTIME=exptime) #- Per EXPID TSNR2 is summed over cameras, averaged over petals for colname in tsnr2_colnames: @@ -228,8 +190,7 @@ def main(): jj = ii & ispetal[petal] if np.any(jj): tsnr2_petal.append(np.sum(cam_summary[colname][jj])) - - row[colname] = np.mean(tsnr2_petal) + row[colname] = np.mean(tsnr2_petal) # mean of petals (each being median of valid fibers) rows.append(row) @@ -237,16 +198,264 @@ def main(): exp_summary = Table(rows=rows, names=colnames) exp_summary.meta['EXTNAME'] = 'TSNR2_EXPID' + if preexisting_tsnr2_expid_table is not None : + log.debug("Update to preexisting") + exp_summary = update_table(preexisting_tsnr2_expid_table,exp_summary,["EXPID"]) + if preexisting_tsnr2_frame_table is not None : + log.debug("Update to preexisting") + cam_summary = update_table(preexisting_tsnr2_frame_table,cam_summary,["EXPID","CAMERA"]) + + log.info("Add effective times from TSNR2 values") + exp_summary['ELG_EFFTIME_DARK'] = efftime_config["TSNR2_ELG_TO_EFFTIME_DARK"] * exp_summary['TSNR2_ELG'] + exp_summary['BGS_EFFTIME_BRIGHT'] = efftime_config["TSNR2_BGS_TO_EFFTIME_BRIGHT"] * exp_summary['TSNR2_BGS'] + + + + hdus = fits.HDUList() hdus.append(fits.convenience.table_to_hdu(exp_summary)) hdus.append(fits.convenience.table_to_hdu(cam_summary)) - tmpfile = args.outfile+'.tmp' + tmpfile = output_filename+'.tmp' hdus.writeto(tmpfile, overwrite=True) - os.rename(tmpfile, args.outfile) + os.rename(tmpfile, output_filename) + + log.info('Successfully wrote {}'.format(output_filename)) + + +def func(prod,night,expid,camera,recompute,alpha_only,details_dir) : + """ + Compute TSNR for this night exposure camera (code in separate function for multiprocessing) + + Args: + prod: str, production dir name + night: int, night + expid: int, expid + camera: str, camera + recompute: bool, recompute tsnr if true even if present in frame + alpha_only: bool, only recompute alpha + details_dir: str or None, save details per frame in this directory if not None + + Returns a dictionnary with NIGHT,EXPID,TILEID,EXPTIME,CAMERA, and the TSNR2 values for this camera + """ + log = get_logger() + + cframe_filename = '{}/exposures/{}/{:08d}/cframe-{}-{:08d}.fits'.format(prod, night, expid, camera, expid) + if not os.path.isfile(cframe_filename) : + return None + + cframe_hdulist = fits.open(cframe_filename) + hdr = cframe_hdulist[0].header + flavor = hdr['FLAVOR'] + if flavor != 'science': + return None + + log.info("Processing {}".format(cframe_filename)) + + prog = hdr['PROGRAM'] + tileid = hdr['TILEID'] + + table = None + compute_tsnr = True + + #- check if already computed in cframe + if "SCORES" in cframe_hdulist and not recompute: + table = Table(cframe_hdulist["SCORES"].data) + key = "TSNR2_ELG_"+camera[0].upper() + if key in table.colnames : + log.debug("Use TSNR values in cframe file") + compute_tsnr = False + + #- check if already computed in details file + table_output_filename = None + if details_dir is not None and not recompute : + table_output_filename= f'{details_dir}/{night}/{expid:08d}/tsnr-{camera}-{expid:08d}.fits' + if os.path.isfile(table_output_filename): + log.debug(f'Using previously generated {table_output_filename}') + table = Table.read(table_output_filename) + compute_tsnr = False + + #- compute tsnr + if compute_tsnr : + tsnr_table = compute_tsnr_values(cframe_filename,cframe_hdulist,night,expid,camera,specprod_dir=prod, alpha_only=alpha_only) + + if table is None : + table = tsnr_table + else : + for k in tsnr_table.columns : + table[k] = tsnr_table[k] + + fibermap = cframe_hdulist["FIBERMAP"].data + + table['FIBER'] = fibermap['FIBER'] + table['TARGETID'] = fibermap['TARGETID'] + table.meta['NIGHT'] = night + table.meta['EXPID'] = expid + table.meta['TILEID'] = tileid + table.meta['CAMERA'] = camera + table.meta['EXTNAME'] = 'TSNR2' + + #- Write per-expid per-camera output if requested + if compute_tsnr and table_output_filename is not None : + Path(os.path.dirname(table_output_filename)).mkdir(parents=True,exist_ok=True) + tmpfile = table_output_filename +'.tmp' + table.write(tmpfile, format='fits', overwrite=True) + os.rename(tmpfile, table_output_filename) + log.debug('Successfully wrote {}.'.format(table_output_filename)) + + #- Append to summary. + entry = dict() + entry['NIGHT'] = np.int32(night) + entry['EXPID'] = np.int32(expid) + entry['TILEID'] = np.int32(tileid) + entry['EXPTIME'] = np.float32(hdr['EXPTIME']) + entry['CAMERA'] = camera + for key in table.colnames: + if key.startswith('TSNR2_'): + #- TSNR2_{TRACER}_{BAND} -> TSNR2_{TRACER} + short_key = '_'.join(key.split('_')[0:2]) + ok=(table[key]!=0) + if np.sum(ok)>0 : + entry[short_key]=np.median(table[key][ok]).astype(np.float32) + else : + entry[short_key]=np.float32(0.) + if key.startswith('TSNR2_ALPHA'): + entry['TSNR2_ALPHA'] = np.median(table[key]).astype(np.float32) + + cframe_hdulist.close() + return entry + +def _func(arg) : + return func(**arg) + +def main(): + log = get_logger() + + args=parse() + if args.prod is None: + args.prod = specprod_root() + elif args.prod.find("/")<0 : + args.prod = specprod_root(args.prod) + + if not args.outfile.endswith(".fits") : + print("Output filename '{}' is incorrect. It has to end with '.fits'.".format(args.outfile)) + sys.exit(1) + + log.info('outfile = {}'.format(args.outfile)) + if args.details_dir is not None : log.info('details-dir = {}'.format(args.details_dir)) + + log.info('prod = {}'.format(args.prod)) + + + if args.cameras is None: + petals = np.arange(10).astype(str) + cameras = [x[0] + x[1] for x in itertools.product(['b', 'r', 'z'], petals.astype(np.str))] + else: + cameras = args.cameras.split(',') + + if args.expids is not None: + expids = [np.int(x) for x in args.expids.split(',')] + else: + expids = None + + if args.nights is None: + dirnames = sorted(glob.glob('{}/exposures/*'.format(args.prod))) + nights=[] + for dirname in dirnames : + try : + night=int(os.path.basename(dirname)) + nights.append(night) + except ValueError as e : + log.warning("ignore {}".format(dirname)) + else : + nights=[int(val) for val in args.nights.split(",")] + + log.info('cameras = {}'.format(cameras)) + log.info("nights = {}".format(nights)) + if expids is not None : log.info('expids = {}'.format(expids)) + + efftime_config_filename = args.efftime_config + if efftime_config_filename is None : + efftime_config_filename = resource_filename('desispec', 'data/tsnr/tsnr-efftime.yaml') + with open(efftime_config_filename) as f: + efftime_config = yaml.load(f, Loader=yaml.FullLoader) + log.info("Eff. time scale factors = {}".format(efftime_config)) + + preexisting_tsnr2_expid_table = None + preexisting_tsnr2_frame_table = None + if args.update and os.path.isfile(args.outfile) : + log.info("Will append pre-existing table {}".format(args.outfile)) + preexisting_tsnr2_expid_table = Table.read(args.outfile,"TSNR2_EXPID") + preexisting_tsnr2_frame_table = Table.read(args.outfile,"TSNR2_FRAME") + + + # starting computing + # one night at a time + + summary_rows = list() + + for count,night in enumerate(nights) : + + dirnames = sorted(glob.glob('{}/exposures/{}/*'.format(args.prod,night))) + night_expids=[] + for dirname in dirnames : + try : + expid=int(os.path.basename(dirname)) + night_expids.append(expid) + except ValueError as e : + log.warning("ignore {}".format(dirname)) + if expids is not None : + night_expids = np.intersect1d(expids,night_expids) + if night_expids.size == 0 : + continue + log.info("{} {}".format(night,night_expids)) + + #pool = multiprocessing.Pool(ncpu) + func_args = [] + for expid in night_expids : + for camera in cameras: + func_args.append({'prod':args.prod,'night':night,'expid':expid,'camera':camera, + 'recompute':args.recompute,'alpha_only':args.alpha_only,'details_dir':args.details_dir}) + + if args.nproc == 1 : + for func_arg in func_args : + entry = func(**func_arg) + if entry is not None : + summary_rows.append(entry) + else : + log.info("Multiprocessing with {} procs".format(args.nproc)) + pool = multiprocessing.Pool(args.nproc) + results = pool.map(_func, func_args) + for entry in results : + if entry is not None : + summary_rows.append(entry) + pool.close() + pool.join() + + + # write result after every other night + if len(summary_rows)>0 : + tmpfilename=args.outfile.replace(".fits","_tmp.fits") + write_summary(summary_rows,tmpfilename,efftime_config,preexisting_tsnr2_expid_table,preexisting_tsnr2_frame_table) + log.info("wrote {} entries in tmp file {}".format(len(summary_rows),tmpfilename)) + + + # end of loop on nights + if len(summary_rows)>0 : + write_summary(summary_rows,args.outfile,efftime_config,preexisting_tsnr2_expid_table,preexisting_tsnr2_frame_table) + + # remove temporary file if successful + if os.path.isfile(args.outfile) : + tmpfilename=args.outfile.replace(".fits","_tmp.fits") + if os.path.isfile(tmpfilename) : + log.info("rm temporary file {}".format(tmpfilename)) + os.unlink(tmpfilename) + else : + log.error("no valid exposures added") + + + - log.info('Successfully wrote {}'.format(args.outfile)) - if __name__ == '__main__': main() diff --git a/bin/plot_frame b/bin/plot_frame index 28c3e489e..922cd950d 100755 --- a/bin/plot_frame +++ b/bin/plot_frame @@ -12,6 +12,7 @@ from desispec.util import parse_fibers from desispec.qproc.io import read_qframe from desispec.io import read_fibermap,read_frame from desispec.interpolation import resample_flux +from desispec.fluxcalibration import isStdStar parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('-i','--infile', type = str, default = None, required = True, nargs="*", @@ -38,6 +39,7 @@ parser.add_argument('--radial', action = 'store_true', help = 'show radial focal parser.add_argument('--wmin', type = float, default=None, help = 'min of wavelength range for mean flux of focal plane view (only valid form Frame, not QFrame)') parser.add_argument('--wmax', type = float, default=None, help = 'max of wavelength range for mean flux of focal plane view (only valid form Frame, not QFrame)') parser.add_argument('--no-mask', action = 'store_true', help = 'ignore mask') +parser.add_argument('--std-stars', action = 'store_true', help = 'show only the std stars') args = parser.parse_args() @@ -70,6 +72,12 @@ for filename in args.infile : else : frame = read_frame(filename) + if args.std_stars : + selection = isStdStar(frame.fibermap) + fibers = frame.fibermap["FIBER"][selection] + print("showing std stars fibers: {}".format(fibers)) + + if args.focal_plane and frame.fibermap is not None : x.append(frame.fibermap["FIBERASSIGN_X"]) y.append(frame.fibermap["FIBERASSIGN_Y"]) diff --git a/doc/changes.rst b/doc/changes.rst index bd3842049..f47a748c9 100644 --- a/doc/changes.rst +++ b/doc/changes.rst @@ -2,17 +2,62 @@ desispec Change Log =================== -0.39.1 (2021-02-16) +0.39.4 (unreleased) ------------------- * Add fiber crosstalk correction (PR `#1138`_). .. _`#1138`: https://github.com/desihub/desispec/issues/1138 +0.39.3 (2020-03-04) +------------------- + +Cascades update tag for final catalog creation. + +Note: datamodel changes to coadd SCORES and FIBERMAP + +* Propagate TSNR2 into coadd SCORES; update coadd FIBERMAP columns (PR `#1166`_) +* ``bin/desi_tsnr_afterburner`` use pre-calculated TSNR2 from frame files + unless requested to recalculate (PR `#1167`_). + +.. _`#1166`: https://github.com/desihub/desispec/issues/1166 +.. _`#1167`: https://github.com/desihub/desispec/issues/1167 + +0.39.2 (2021-03-02) +------------------- + +Cascades update tag to fix coadd and tSNR crashes, and postfacto tag +``desi_spectro_calib`` version in desispec module file. + +* Processing dashboard useability updates (PR `#1152`_). +* Undo heliocentric correction in throughput analysis not used for + production processing (PR `#1154`_). +* Fix coadd crash (PR `#1163`_). +* Fix tSNR alpha<0.8 crash (PR `#1164`_). +* Updated desi_spectro_calib version to 0.2.4. + +.. _`#1152`: https://github.com/desihub/desispec/issues/1152 +.. _`#1154`: https://github.com/desihub/desispec/issues/1154 +.. _`#1163`: https://github.com/desihub/desispec/issues/1163 +.. _`#1164`: https://github.com/desihub/desispec/issues/1164 + +0.39.1 (2021-02-23) +------------------- + +Cascades update tag to add functionality for using a queue reservation and for +debugging, without algorithmically impacting what has already been run +with the 0.39.0 tag. + +* Add ``desi_run_night --reservation`` option (PR `#1145`_). +* Fix ``desi_process_exposure --no-zero-ivar`` option (PR `#1146`_). + +.. _`#1145`: https://github.com/desihub/desispec/issues/1145 +.. _`#1146`: https://github.com/desihub/desispec/issues/1146 + 0.39.0 (2021-02-16) ------------------- -Tag for software release 21.2 and Cascades run. +Initial tag for Cascades run. Major updates: diff --git a/etc/desispec.module b/etc/desispec.module index 818483b4a..709e1fa64 100644 --- a/etc/desispec.module +++ b/etc/desispec.module @@ -76,7 +76,7 @@ setenv [string toupper $product] $PRODUCT_DIR # # Add any non-standard Module code below this point. # -setenv DESI_SPECTRO_CALIB $env(DESI_ROOT)/spectro/desi_spectro_calib/0.2.1 +setenv DESI_SPECTRO_CALIB $env(DESI_ROOT)/spectro/desi_spectro_calib/0.2.4 # for MPI+multiprocessing coordination setenv MPICH_GNI_FORK_MODE FULLCOPY diff --git a/py/desispec/_version.py b/py/desispec/_version.py index 034190ce5..d6d81d934 100644 --- a/py/desispec/_version.py +++ b/py/desispec/_version.py @@ -1 +1 @@ -__version__ = '0.39.0.dev4768' +__version__ = '0.39.3.dev4804' diff --git a/py/desispec/coaddition.py b/py/desispec/coaddition.py index 87a663be0..6f98523ab 100644 --- a/py/desispec/coaddition.py +++ b/py/desispec/coaddition.py @@ -11,10 +11,7 @@ import scipy.linalg import scipy.sparse.linalg -from astropy.table import Column - -# for debugging -import astropy.io.fits as pyfits +from astropy.table import Table, Column import multiprocessing @@ -24,6 +21,7 @@ from desispec.spectra import Spectra from desispec.resolution import Resolution from desispec.fiberbitmasking import get_all_fiberbitmask_with_amp, get_all_nonamp_fiberbitmask_val, get_justamps_fiberbitmask +from desispec.specscore import compute_coadd_scores def coadd_fibermap(fibermap) : @@ -33,7 +31,6 @@ def coadd_fibermap(fibermap) : targets = np.unique(fibermap["TARGETID"]) ntarget = targets.size - jj=np.zeros(ntarget,dtype=int) for i,tid in enumerate(targets) : jj[i]=np.where(fibermap["TARGETID"]==tid)[0][0] @@ -44,21 +41,43 @@ def coadd_fibermap(fibermap) : tfmap['COADD_EXPTIME'] = np.zeros(len(tfmap), dtype=np.float32) - 1 # smarter values for some columns - for k in ['DELTA_X','DELTA_Y'] : + mean_cols = [ + 'DELTA_X', 'DELTA_Y', + 'FIBER_X', 'FIBER_Y', + 'FIBER_RA', 'FIBER_DEC', + 'FIBERASSIGN_X', 'FIBERASSIGN_Y' + ] + rms_cols = ['DELTA_X', 'DELTA_Y'] #- rms_cols must also be in mean_cols + for k in mean_cols: if k in fibermap.colnames : - tfmap.rename_column(k,'MEAN_'+k) - xx = Column(np.zeros(ntarget)) - tfmap.add_column(xx,name='RMS_'+k) - for k in ['NIGHT','EXPID','TILEID','SPECTROID','FIBER'] : + if k.endswith('_RA') or k.endswith('_DEC'): + dtype = np.float64 + else: + dtype = np.float32 + if k in mean_cols: + xx = Column(np.zeros(ntarget, dtype=dtype)) + tfmap.add_column(xx,name='MEAN_'+k) + if k in rms_cols: + xx = Column(np.zeros(ntarget, dtype=dtype)) + tfmap.add_column(xx,name='RMS_'+k) + + tfmap.remove_column(k) + + first_last_cols = ['NIGHT','EXPID','TILEID','SPECTROID','FIBER','MJD'] + for k in first_last_cols: if k in fibermap.colnames : + if k in ['MJD']: + dtype = np.float32 + else: + dtype = np.int32 if not 'FIRST_'+k in tfmap.dtype.names : - xx = Column(np.arange(ntarget)) + xx = Column(np.arange(ntarget, dtype=dtype)) tfmap.add_column(xx,name='FIRST_'+k) if not 'LAST_'+k in tfmap.dtype.names : - xx = Column(np.arange(ntarget)) + xx = Column(np.arange(ntarget, dtype=dtype)) tfmap.add_column(xx,name='LAST_'+k) if not 'NUM_'+k in tfmap.dtype.names : - xx = Column(np.arange(ntarget)) + xx = Column(np.arange(ntarget, dtype=np.int16)) tfmap.add_column(xx,name='NUM_'+k) for i,tid in enumerate(targets) : @@ -77,25 +96,34 @@ def coadd_fibermap(fibermap) : tfmap['COADD_NUMEXP'][i] = np.count_nonzero(good_coadds) if 'EXPTIME' in fibermap.colnames : tfmap['COADD_EXPTIME'][i] = np.sum(fibermap['EXPTIME'][jj][good_coadds]) - for k in ['DELTA_X','DELTA_Y'] : + for k in mean_cols: if k in fibermap.colnames : vals=fibermap[k][jj] tfmap['MEAN_'+k][i] = np.mean(vals) - tfmap['RMS_'+k][i] = np.sqrt(np.mean(vals**2)) # inc. mean offset, not same as std - for k in ['NIGHT','EXPID','TILEID','SPECTROID','FIBER'] : + for k in rms_cols: + if k in fibermap.colnames : + vals=fibermap[k][jj] + # RMS includes mean offset, not same as std + tfmap['RMS_'+k][i] = np.sqrt(np.mean(vals**2)) + + for k in first_last_cols: if k in fibermap.colnames : vals=fibermap[k][jj] tfmap['FIRST_'+k][i] = np.min(vals) tfmap['LAST_'+k][i] = np.max(vals) tfmap['NUM_'+k][i] = np.unique(vals).size - for k in ['FIBERASSIGN_X', 'FIBERASSIGN_Y','FIBER_RA', 'FIBER_DEC'] : - if k in fibermap.colnames : - tfmap[k][i]=np.mean(fibermap[k][jj]) + for k in ['FIBER_RA_IVAR', 'FIBER_DEC_IVAR','DELTA_X_IVAR', 'DELTA_Y_IVAR'] : if k in fibermap.colnames : tfmap[k][i]=np.sum(fibermap[k][jj]) + #- Remove some columns that apply to individual exp but not coadds + #- (even coadds of the same tile) + for k in ['NIGHT', 'EXPID', 'MJD', 'EXPTIME', 'NUM_ITER']: + if k in tfmap.colnames: + tfmap.remove_column(k) + return tfmap def coadd(spectra, cosmics_nsig=0.) : @@ -149,10 +177,15 @@ def coadd(spectra, cosmics_nsig=0.) : ttivar = spectra.ivar[b][j] good = (ttivar>0) bad = ~good + if np.sum(good)==0 : + continue + nbad = np.sum(bad) ttflux = spectra.flux[b][j].copy() - ttflux[bad] = np.interp(spectra.wave[b][bad],spectra.wave[b][good],ttflux[good]) + if nbad>0 : + ttflux[bad] = np.interp(spectra.wave[b][bad],spectra.wave[b][good],ttflux[good]) ttivar = spectra.ivar[b][j].copy() - ttivar[bad] = np.interp(spectra.wave[b][bad],spectra.wave[b][good],ttivar[good]) + if nbad>0 : + ttivar[bad] = np.interp(spectra.wave[b][bad],spectra.wave[b][good],ttivar[good]) ttvar = 1./(ttivar+(ttivar==0)) ttflux[1:] = ttflux[1:]-ttflux[:-1] ttvar[1:] = ttvar[1:]+ttvar[:-1] @@ -170,19 +203,21 @@ def coadd(spectra, cosmics_nsig=0.) : gradvar=np.array(gradvar) gradivar=(gradvar>0)/np.array(gradvar+(gradvar==0)) nspec=grad.shape[0] - meangrad=np.sum(gradivar*grad,axis=0)/np.sum(gradivar) - deltagrad=grad-meangrad - chi2=np.sum(gradivar*deltagrad**2,axis=0)/(nspec-1) - - bad = (chi2>cosmics_nsig**2) - nbad = np.sum(bad) - if nbad>0 : - log.info("masking {} values for targetid={}".format(nbad,tid)) - badindex=np.where(bad)[0] - for bi in badindex : - k=np.argmax(gradivar[:,bi]*deltagrad[:,bi]**2) - ivarjj[k,bi]=0. - log.debug("masking spec {} wave={}".format(k,spectra.wave[b][bi])) + sgradivar=np.sum(gradivar) + if sgradivar>0 : + meangrad=np.sum(gradivar*grad,axis=0)/sgradivar + deltagrad=grad-meangrad + chi2=np.sum(gradivar*deltagrad**2,axis=0)/(nspec-1) + + bad = (chi2>cosmics_nsig**2) + nbad = np.sum(bad) + if nbad>0 : + log.info("masking {} values for targetid={}".format(nbad,tid)) + badindex=np.where(bad)[0] + for bi in badindex : + k=np.argmax(gradivar[:,bi]*deltagrad[:,bi]**2) + ivarjj[k,bi]=0. + log.debug("masking spec {} wave={}".format(k,spectra.wave[b][bi])) tivar[i]=np.sum(ivarjj,axis=0) tflux[i]=np.sum(ivarjj*spectra.flux[b][jj],axis=0) @@ -206,8 +241,15 @@ def coadd(spectra, cosmics_nsig=0.) : spectra.mask[b] = tmask spectra.resolution_data[b] = trdata + if spectra.scores is not None: + orig_scores = Table(spectra.scores.copy()) + orig_scores['TARGETID'] = spectra.fibermap['TARGETID'] + else: + orig_scores = None + spectra.fibermap=coadd_fibermap(spectra.fibermap) spectra.scores=None + compute_coadd_scores(spectra, orig_scores, update_coadd=True) def coadd_cameras(spectra,cosmics_nsig=0.) : @@ -369,13 +411,23 @@ def coadd_cameras(spectra,cosmics_nsig=0.) : bands="" for b in sbands : bands+=b + if spectra.mask is not None : dmask={bands:mask,} else : dmask=None + res=Spectra(bands=[bands,],wave={bands:wave,},flux={bands:flux,},ivar={bands:ivar,},mask=dmask,resolution_data={bands:rdata,}, fibermap=fibermap,meta=spectra.meta,extra=spectra.extra,scores=None) + if spectra.scores is not None: + orig_scores = spectra.scores.copy() + orig_scores['TARGETID'] = spectra.fibermap['TARGETID'] + else: + orig_scores = None + + compute_coadd_scores(res, orig_scores, update_coadd=True) + return res def get_resampling_matrix(global_grid,local_grid,sparse=False): diff --git a/py/desispec/data/tsnr/tsnr-efftime.yaml b/py/desispec/data/tsnr/tsnr-efftime.yaml new file mode 100644 index 000000000..14a210721 --- /dev/null +++ b/py/desispec/data/tsnr/tsnr-efftime.yaml @@ -0,0 +1,4 @@ +# fitted on cascades data, with a new computation of tsnr including dust extinction +# multiply the TSNR2 values by these numbers to convert them to effective times +TSNR2_ELG_TO_EFFTIME_DARK: 8.60 +TSNR2_BGS_TO_EFFTIME_BRIGHT: 0.14 diff --git a/py/desispec/desi_proc_dashboard.py b/py/desispec/desi_proc_dashboard.py index 628e4b9e0..f8a73e55b 100755 --- a/py/desispec/desi_proc_dashboard.py +++ b/py/desispec/desi_proc_dashboard.py @@ -175,9 +175,9 @@ def main(args): else: nights = [nigh.strip(' \t') for nigh in args.nights.split(',')] - tonight=what_night_is_it() - if str(tonight) not in nights: - nights.append(str(tonight)) + #tonight=what_night_is_it() # Disabled per Anthony's request + #if str(tonight) not in nights: + # nights.append(str(tonight)) nights.sort(reverse=True) nights = np.array(nights) @@ -222,6 +222,33 @@ def main(args): strTable += "\t
{}
".format(background,ctype) strTable += '\n\n' + strTable +="""Filter By Status: + + """ + # The following codes are for filtering rows by obstype and exptime. Not in use for now, but basically can be enabled anytime. + #strTable +="""Filter By OBSTYPE: + # + #Exptime Limit: + # + #""" + for month, nights_in_month in nights_dict.items(): print("Month: {}, nights: {}".format(month,nights_in_month)) webpage = os.path.join(os.getenv('DESI_DASHBOARD'), 'links', month) @@ -242,7 +269,6 @@ def main(args): strTable += _closing_str() with open(os.path.join(os.getenv('DESI_DASHBOARD'),args.output_name),'w') as hs: hs.write(strTable) - ########################## #### Fix Permission ###### ########################## @@ -250,9 +276,6 @@ def main(args): os.system(cmd) - - - def monthly_table(tables,month): """ Add a collapsible and extendable table to the html file for a specific month @@ -322,9 +345,10 @@ def nightly_table(night,skipd_expids=set(),show_null=True,use_short_sci=False): nbad=nbad) nightly_table_str= '\n'.format(night) nightly_table_str += '
\n' - nightly_table_str += "" + # Add table + nightly_table_str += "
ExpidFLAVOROBSTYPEEXPTIMESPECTROGRAPHSTILEID
" nightly_table_str += "" - nightly_table_str += "" + nightly_table_str += "" nightly_table_str += main_body nightly_table_str += "
ExpidFLAVOROBSTYPEEXPTIMESPECTROGRAPHSTILEIDPSF Fileframe fileFFlat filesframe filesky filecframe fileslurm filelog file
cframe fileslurm filelog filestatus
\n" @@ -359,17 +383,22 @@ def calculate_one_night_use_file(night, use_short_sci=False): totals_by_type['ZERO'] = {'psf': 0, 'ff': 0, 'frame': 0, 'sframe': 0} totals_by_type['SCIENCE'], totals_by_type['NONE'] = totals_by_type['SKY'], totals_by_type['SKY'] - #table_dir = os.path.join(os.getenv('DESI_SPECTRO_REDUX'), os.getenv('SPECPROD'),'processing_tables') - #file_processing = '{}/{}{}-{}.csv'.format(table_dir,'processing_table_',os.getenv('SPECPROD'),night) + table_dir = os.path.join(os.getenv('DESI_SPECTRO_REDUX'), os.getenv('SPECPROD'),'processing_tables') + file_processing = '{}/{}{}-{}.csv'.format(table_dir,'processing_table_',os.getenv('SPECPROD'),night) #file_unprocessed = '{}/{}{}-{}.csv'.format(table_dir,'unprocessed_table_',os.getenv('SPECPROD'),night) file_exptable=get_exposure_table_pathname(night) try: # Try reading tables first. Switch to counting files if not failed. d_exp = ascii.read(file_exptable, data_start=2, delimiter=',') - #d_processing = load_table(file_processing) # commented out temporarily, might used later + d_processing = load_table(file_processing) # commented out temporarily, might used later #d_unprocessed = load_table(file_unprocessed) except: return calculate_one_night(night, use_short_sci=use_short_sci) + expid_processing=[] + #import pdb;pdb.set_trace() + for expid_list in d_processing['EXPID']: + expid_processing += expid_list.tolist() + expid_processing = set(expid_processing) expids = list(d_exp['EXPID']) expids.sort(reverse=True) @@ -460,6 +489,10 @@ def calculate_one_night_use_file(night, use_short_sci=False): hlink1 = '----' hlink2 = '----' + if expid in expid_processing: + status = 'processing' + else: + status = 'unprocessed' if row_color not in ['GOOD','NULL'] and obstype.lower() in ['arc','flat','science']: lognames = glob.glob(logfileglob.format(obstype.lower(), night,zfild_expid,'log')) @@ -505,7 +538,7 @@ def calculate_one_night_use_file(night, use_short_sci=False): _str_frac( nsky, n_spgrph * n_tots['sframe']), \ _str_frac( ncframes, n_spgrph * n_tots['sframe']), \ hlink1, \ - hlink2 ] + hlink2, status ] return output @@ -650,7 +683,7 @@ def calculate_one_night(night, use_short_sci=False): hlink1 = _hyperlink(relpath_slurm, 'Slurm') hlink2 = _hyperlink(relpath_log, 'Log') - + status = 'unprocessed' output[str(expid)] = [row_color, \ expid, \ header_info['FLAVOR'],\ @@ -665,7 +698,7 @@ def calculate_one_night(night, use_short_sci=False): _str_frac( len(file_sky), n_spgrph * n_tots['sframe']), \ _str_frac( ncframes, n_spgrph * n_tots['sframe']), \ hlink1, \ - hlink2 ] + hlink2, status ] return output @@ -726,6 +759,28 @@ def _initialize_page(color_profile): text-decoration: none; cursor: pointer; } + +#obstypelist { + + background-position: 10px 10px; + background-repeat: no-repeat; + width: 10%; + font-size: 16px; + padding: 12px 20px 12px 40px; + border: 1px solid #ddd; + margin-bottom: 12px; +} +#exptimelist { + + background-position: 10px 10px; + background-repeat: no-repeat; + width: 10%; + font-size: 16px; + padding: 12px 20px 12px 40px; + border: 1px solid #ddd; + margin-bottom: 12px; +} + """ for ctype,cdict in color_profile.items(): font = cdict['font'] @@ -733,7 +788,7 @@ def _initialize_page(color_profile): html_page += 'table tr#'+str(ctype)+' {background-color:'+str(background)+'; color:'+str(font)+';}\n' html_page += '\n' - html_page += '

DESI Daily Processing Status Monitor

\n' + html_page += '

DESI '+os.getenv("SPECPROD").upper()+' Processing Status Monitor

\n' return html_page @@ -745,10 +800,16 @@ def _closing_str(): def _table_row(elements,idlabel=None): color_profile = return_color_profile() + if elements[14]=='unprocessed': + style_str='display:none;' + else: + style_str='' + if idlabel is None: - row_str = '' + row_str = ''.format(style_str) else: - row_str = '' + row_str = '' + for elem in elements: chars = str(elem).split('/') if len(chars)==2: # m/n @@ -761,7 +822,6 @@ def _table_row(elements,idlabel=None): elif chars[0]!='0' and int(chars[0])==int(chars[1]): row_str += _table_element_style(elem,'background-color:'+color_profile['GOOD']['background']) # Medium Aqua Green - else: row_str += _table_element(elem) row_str += ''#\n' @@ -784,7 +844,7 @@ def _str_frac(numerator,denominator): def _js_path(output_dir): return os.path.join(output_dir,'js','open_nightly_table.js') -def js_import_str(output_dir): +def js_import_str(output_dir): # Not used output_path = _js_path(output_dir) if not os.path.exists(os.path.join(output_dir,'js')): os.makedirs(os.path.join(output_dir,'js')) @@ -825,7 +885,7 @@ def _write_js_script(output_path): with open(output_path,'w') as outjs: outjs.write(s) -def js_str(): +def js_str(): # Used """ Return the javascript script to be added to the html file """ @@ -855,6 +915,70 @@ def js_str(): for (i = 0; i < coll.length; i++) { coll[i].nextElementSibling.style.maxHeight='0px' }}); + function filterByStatus() { + var input, filter, table, tr, td, i; + input = document.getElementById("statuslist"); + filter = input.value.toUpperCase(); + tables = document.getElementsByClassName("nightTable") + for (j = 0; j < tables.length; j++){ + table = tables[j] + tr = table.getElementsByTagName("tr"); + for (i = 0; i < tr.length; i++) { + td = tr[i].getElementsByTagName("td")[14]; + console.log(td) + if (td) { + if (td.innerHTML.toUpperCase().indexOf(filter) > -1 || filter==='ALL') { + tr[i].style.display = ""; + } else { + tr[i].style.display = "none"; + } + } + } + }} + + function filterByObstype() { + var input, filter, table, tr, td, i; + input = document.getElementById("obstypelist"); + filter = input.value.toUpperCase(); + tables = document.getElementsByClassName("nightTable") + for (j = 0; j < tables.length; j++){ + table = tables[j] + tr = table.getElementsByTagName("tr"); + for (i = 0; i < tr.length; i++) { + td = tr[i].getElementsByTagName("td")[2]; + if (td) { + if (td.innerHTML.toUpperCase().indexOf(filter) > -1 || filter==='ALL') { + tr[i].style.display = ""; + } else { + tr[i].style.display = "none"; + } + } + } + }} + + function filterByExptime() { + var input, filter, table, tr, td, i; + input = document.getElementById("exptimelist"); + filter = input.value.toUpperCase(); + tables = document.getElementsByClassName("nightTable") + for (j = 0; j < tables.length; j++){ + table = tables[j] + tr = table.getElementsByTagName("tr"); + for (i = 0; i < tr.length; i++) { + td = tr[i].getElementsByTagName("td")[3]; + if (td) { + if (filter==='ALL') { + tr[i].style.display = ""; + } else if (parseInt(td.innerHTML) <= parseInt(filter)){ + tr[i].style.display = ""; } + else { + tr[i].style.display = "none"; + } + } + } + }} + + """ return s diff --git a/py/desispec/fluxcalibration.py b/py/desispec/fluxcalibration.py index feddeeeb4..8b679193d 100644 --- a/py/desispec/fluxcalibration.py +++ b/py/desispec/fluxcalibration.py @@ -859,7 +859,7 @@ def compute_flux_calibration(frame, input_model_wave,input_model_flux,input_mode - then convolve it to the data resolution (the input wave grid is supposed finer than the spectral resolution) - then iteratively - fit the mean throughput (deconvolved, this is needed because of sharp atmospheric absorption lines) - - compute broad band correction to fibers (to correct for small mis-alignement for instance) + - compute a scale factor to fibers (to correct for small mis-alignement for instance) - perform outlier rejection There is one subtelty with the relation between calibration and resolution. @@ -926,12 +926,6 @@ def add_margin_3d_dim2(iarray,margin) : log.info("Std stars fibers: {}".format(stdfibers)) stdstars = tframe[stdfibers] - - - - - - nwave=stdstars.nwave nstds=stdstars.flux.shape[0] @@ -961,6 +955,12 @@ def add_margin_3d_dim2(iarray,margin) : else : airmass = 1.0 + # apply heliocentric/barocentric correction if exists + if 'HELIOCOR' in stdstars.meta : + heliocor = stdstars.meta['HELIOCOR'] + log.info("Also apply heliocentric correction scale factor {} to calibration".format(heliocor)) + acal.wave *= heliocor # now the calibration wave are also in the solar system frame + # interpolate wavelength fluxcal = np.interp(stdstars.wave,acal.wave,acal.value(airmass=airmass,seeing=1.1),left=0,right=0) @@ -998,12 +998,9 @@ def add_margin_3d_dim2(iarray,margin) : calib = (convolved_model_flux!=0)*(stdstars.flux/(convolved_model_flux + (convolved_model_flux==0))) median_calib = np.median(calib, axis=0) - - # First fit of smooth correction per fiber, and 10% model error to variance, and perform first outlier rejection - fit_smooth_correction = True - smooth_fiber_correction=np.ones((stdstars.flux.shape)) + # Fit one normalization per fiber, and 10% model error to variance, and perform first outlier rejection + scale=np.ones((nstds)) chi2=np.zeros((stdstars.flux.shape)) - badfiber=np.zeros(nstds,dtype=int) number_of_stars_with_negative_correction = 0 @@ -1015,52 +1012,33 @@ def add_margin_3d_dim2(iarray,margin) : badfiber[star] = 1 continue - if fit_smooth_correction : - M = median_calib*stdstars.R[star].dot(model_flux[star]) - - if np.any(M<=0) : - log.warning("ignore negative correction for star {}".format(star)) - number_of_stars_with_negative_correction +=1 - continue + M = median_calib*stdstars.R[star].dot(model_flux[star]) + a = np.sum(current_ivar[star]*M**2) + b = np.sum(current_ivar[star]*stdstars.flux[star]*M) + if a>0 : + scale[star] = b/a + else : + scale[star] = 0 + log.info("star {} initial scale = {}".format(star,scale[star])) + if scale[star] < 0.2 : + log.warning("ignore star {} with scale = {}".format(star,scale[star])) + badfiber[star] = 1 + current_ivar[star]=0. + continue - try: - ii = np.where(M>0.1*np.mean(M))[0] - if ii.size == 0 : - current_ivar[star]=0. - badfiber[star] = 1 - continue - pol=np.poly1d(np.polyfit(dwave[ii],stdstars.flux[star,ii]/M[ii],deg=deg,w=current_ivar[star,ii]*M[ii]**2)) - smooth_fiber_correction[star]=pol(dwave) - except ValueError : - log.warning("polynomial fit for star %d failed"%star) - current_ivar[star]=0. - badfiber[star] = 1 - continue - except numpy.linalg.LinAlgError : - log.warning("polynomial fit for star %d failed"%star) - current_ivar[star]=0. - badfiber[star] = 1 - continue + # add generous multiplicative error to ivar for sigma clipping (because we only want to discard cosmics or bad pixels here) + current_ivar[star] = (current_ivar[star]>0)/(1./(current_ivar[star] + (current_ivar[star]==0))+(0.2*stdstars.flux[star])**2) + chi2[star]=current_ivar[star]*(stdstars.flux[star]-scale[star]*M)**2 - # add few percent multiplicative error to ivar for sigma clipping - chi2[star]=(current_ivar[star]>0)*(stdstars.flux[star]-smooth_fiber_correction[star]*M)**2/(1./(current_ivar[star] + (current_ivar[star]==0))+(0.1*stdstars.flux[star])**2) - # checking indexing using mags - #from desispec.io.filters import load_legacy_survey_filter - #from astropy import units - #filter_response=load_legacy_survey_filter("R","N") - #fluxunits = 1e-17 * units.erg / units.s / units.cm**2 / units.Angstrom - #dummy_wave = np.linspace(3000,12000,12000-3000) - #dummy_flux = np.interp(dummy_wave,stdstars.wave,M,left=0,right=0) - #mag = filter_response.get_ab_magnitude(dummy_flux*fluxunits,dummy_wave) - #fmapmag = -2.5*np.log10(stdstars.fibermap["FLUX_R"][star])+22.5 - #print("star index={} flux ratio={}".format(star,10**(0.4*(mag-fmapmag)))) - - if number_of_stars_with_negative_correction > min(1,nstds) : - log.warning("Disable smooth correction for this exposure") - for star in range(nstds) : - smooth_fiber_correction[star] = 1. - current_ivar=stdstars.ivar*(stdstars.mask==0) - fit_smooth_correction = False + # normalize to preserve the average throughput + # throughput = < data/(model*scale) > + # but we would like to have throughput = < data/model > + # (we don't do that directly to reduce noise) + # so we want to average the inverse of the scale + mscale=1./np.mean(1./scale[badfiber==0]) + log.info("mean scale = {:4.3f}".format(mscale)) + scale /= mscale + median_calib *= mscale bad=(chi2>nsig_clipping**2) @@ -1075,7 +1053,7 @@ def add_margin_3d_dim2(iarray,margin) : nout_tot=0 - previous_mean=0. + for iteration in range(20) : # fit mean calibration @@ -1084,17 +1062,14 @@ def add_margin_3d_dim2(iarray,margin) : # loop on star to handle resolution for star in range(nstds) : - if star%10==0 : - log.info("iter %d star %d"%(iteration,star)) - if badfiber[star]: continue R = stdstars.R[star] # diagonal sparse matrix with content = sqrt(ivar)*flat - D1.setdiag(sqrtw[star]*smooth_fiber_correction[star]) + D1.setdiag(sqrtw[star]*scale[star]) D2.setdiag(model_flux[star]) - sqrtwmodelR = D1.dot(R.dot(D2)) # chi2 = sum (sqrtw*data_flux -diag(sqrtw)*smooth_fiber_correction*R*diag(model_flux)*calib ) + sqrtwmodelR = D1.dot(R.dot(D2)) # chi2 = sum (sqrtw*data_flux -diag(sqrtw)*scale*R*diag(model_flux)*calib ) A = A+(sqrtwmodelR.T*sqrtwmodelR).tocsr() B += sqrtwmodelR.T*sqrtwflux[star] @@ -1112,8 +1087,6 @@ def add_margin_3d_dim2(iarray,margin) : B += median_calib*epsilon log.info("iter %d solving"%iteration) - ### log.debug('cond(A) {:g}'.format(np.linalg.cond(A))) - #calibration=cholesky_solve(A, B) w = np.diagonal(A)>0 A_pos_def = A[w,:] A_pos_def = A_pos_def[:,w] @@ -1133,46 +1106,27 @@ def add_margin_3d_dim2(iarray,margin) : good = np.where(wmask==0)[0] calibration[bad] = np.interp(bad,good,calibration[good],left=0,right=0) - if fit_smooth_correction : - log.info("iter %d fit smooth correction per fiber"%iteration) - # fit smooth fiberflat and compute chi2 - for star in range(nstds) : - if star%10==0 : - log.info("iter %d fiber %d(smooth)"%(iteration,star)) - - if badfiber[star]: continue - - M = stdstars.R[star].dot(calibration*model_flux[star]) - - try: - ii = np.where(M>0.1*np.mean(M))[0] - if ii.size == 0 : - current_ivar[star]=0. - badfiber[star] = 1 - continue - pol=np.poly1d(np.polyfit(dwave[ii],stdstars.flux[star,ii]/M[ii],deg=deg,w=current_ivar[star,ii]*M[ii]**2)) - smooth_fiber_correction[star]=pol(dwave) - except ValueError as e : - log.warning("polynomial fit for star %d failed"%star) - current_ivar[star]=0. - badfiber[star] = 1. - continue - except numpy.linalg.LinAlgError as e : - log.warning("polynomial fit for star %d failed"%star) - current_ivar[star]=0. - badfiber[star] = 1. - continue - chi2[star]=current_ivar[star]*(stdstars.flux[star]-smooth_fiber_correction[star]*M)**2 - else : - log.info("iter %d compute chi2"%iteration) - for star in range(nstds) : - if star%10==0 : - log.info("iter %d fiber %d(smooth)"%(iteration,star)) + log.info("iter %d fit scale per fiber"%iteration) + for star in range(nstds) : - if badfiber[star]: continue - M = stdstars.R[star].dot(calibration*model_flux[star]) - chi2[star]=current_ivar[star]*(stdstars.flux[star]-smooth_fiber_correction[star]*M)**2 + if badfiber[star]: continue + + M = stdstars.R[star].dot(calibration*model_flux[star]) + a = np.sum(current_ivar[star]*M**2) + b = np.sum(current_ivar[star]*stdstars.flux[star]*M) + if a>0 : + scale[star] = b/a + else : + scale[star] = 0 + log.debug("iter {} star {} scale = {}".format(iteration,star,scale[star])) + if scale[star] < 0.2 : + log.warning("iter {} ignore star {} with scale = {}".format(iteration,star,scale[star])) + badfiber[star] = 1 + current_ivar[star]=0. + continue + + chi2[star]=current_ivar[star]*(stdstars.flux[star]-scale[star]*M)**2 log.info("iter {0:d} rejecting".format(iteration)) @@ -1209,46 +1163,43 @@ def add_margin_3d_dim2(iarray,margin) : if ndf>0 : chi2pdf=sum_chi2/ndf - # normalize to preserve the average throughput - # and throughput = < data/model/correction > - # and we would like to have throughput = < data/model > - # (we don't do that directly to reduce noise) - # so we want to average the inverse of the smooth correction - mean=1./np.nanmean(1./smooth_fiber_correction[badfiber==0],axis=0) - medcorr = np.median(smooth_fiber_correction,axis=1) - log.info("median correction = {}".format(medcorr)) + + # see comment above about why we perform this averaging + mscale=1./np.mean(1./scale[badfiber==0]) if highest_throughput_nstars > 0 : log.info("keeping {} stars with highest throughput".format(highest_throughput_nstars)) - ii=np.argsort(medcorr)[::-1][:highest_throughput_nstars] + ii=np.argsort(scale)[::-1][:highest_throughput_nstars] log.info("use those fibers = {}".format(stdfibers[ii])) log.info("with median correction = {}".format(medcorr[ii])) - mean=1./np.nanmean(1./smooth_fiber_correction[ii][badfiber[ii]==0],axis=0) + mscale=1./np.mean(1./scale[ii][badfiber[ii]==0]) else : - mmedcorr = np.median(medcorr) - rmscorr = 1.4*np.median(np.abs(medcorr-mmedcorr)) - log.info("mean rms correction = {} {}".format(mmedcorr,rmscorr)) - bad=(np.abs(medcorr-mmedcorr)>3*rmscorr) + medscale = np.median(scale[badfiber==0]) + rmscorr = 1.4*np.median(np.abs(scale[badfiber==0]-medscale)) + log.info("iter {} mean median rms scale = {:4.3f} {:4.3f} {:4.3f}".format(iteration,mscale,medscale,rmscorr)) + bad=(np.abs(scale-medscale)>3*rmscorr) if np.sum(bad)>0 : - good=(np.abs(medcorr-mmedcorr)<=3*rmscorr) - log.info("use {} stars, discarding 3 sigma outlier stars with medcorr = {}".format(np.sum(good),medcorr[bad])) - mean=1./np.nanmean(1./smooth_fiber_correction[good][badfiber[good]==0],axis=0) + good=~bad + log.info("iter {} use {} stars, discarding 3 sigma outlier stars with medcorr = {}".format(iteration,np.sum(good),scale[bad])) + mscale=1./np.mean(1./scale[good][badfiber[good]==0]) else : - log.info("use {} stars".format(medcorr.size)) + log.info("iter {} use {} stars".format(iteration,np.sum(badfiber==0))) - smooth_fiber_correction /= mean + scale /= mscale + calibration *= mscale - log.info("iter #%d chi2=%f ndf=%d chi2pdf=%f nout=%d mean=%f"%(iteration,sum_chi2,ndf,chi2pdf,nout_iter,np.mean(mean))) + log.info("iter %d chi2=%4.3f ndf=%d chi2pdf=%4.3f nout=%d mscale=%4.3f"%(iteration,sum_chi2,ndf,chi2pdf,nout_iter,mscale)) - if nout_iter == 0 and np.max(np.abs(mean-previous_mean))<0.0001 : + if nout_iter == 0 and np.abs(mscale-1.)<0.0001 : break - previous_mean = mean - # smooth_fiber_correction does not converge exactly to one on average, so we apply its mean to the calibration - # (tested on sims) - calibration /= mean - - log.info("nout tot=%d"%nout_tot) + for star in range(nstds) : + if badfiber[star]==0 : + log.info("star {} final scale = {}".format(star,scale[star])) + log.info("n stars kept = {}".format(np.sum(badfiber==0))) + log.info("rms of scales = {:4.3f}".format(np.std(scale[badfiber==0]))) + log.info("n stars excluded = {}".format(np.sum(badfiber>0))) + log.info("n flux values excluded = %d"%nout_tot) # solve once again to get deconvolved variance #calibration,calibcovar=cholesky_solve_and_invert(A.todense(),B) @@ -1258,7 +1209,6 @@ def add_margin_3d_dim2(iarray,margin) : calibvar=np.array(np.diagonal(calibcovar)) # apply the mean (as in the iterative loop) - calibvar *= mean**2 calibivar=(calibvar>0)/(calibvar+(calibvar==0)) # we also want to save the convolved calibration and a calibration variance @@ -1278,7 +1228,6 @@ def add_margin_3d_dim2(iarray,margin) : ccalibvar=np.array(np.diagonal(ccalibcovar)) # apply the mean (as in the iterative loop) - ccalibvar *= mean**2 ccalibivar=(ccalibvar>0)/(ccalibvar+(ccalibvar==0)) # at least a few stars at each wavelength diff --git a/py/desispec/pixgroup.py b/py/desispec/pixgroup.py index 0c52f222c..75d2fac06 100755 --- a/py/desispec/pixgroup.py +++ b/py/desispec/pixgroup.py @@ -261,6 +261,43 @@ def __init__(self, bands, wave, flux, ivar, mask, resolution_data, fibermap, self.meta = None self.extra = None + def target_ids(self): + """ + Return list of unique target IDs. + + The target IDs are sorted by the order that they first appear. + + Returns (array): + an array of integer target IDs. + """ + uniq, indices = np.unique(self.fibermap["TARGETID"], return_index=True) + return uniq[indices.argsort()] + + def num_spectra(self): + """ + Get the number of spectra contained in this group. + + Returns (int): + Number of spectra contained in this group. + """ + if self.fibermap is not None: + return len(self.fibermap) + else: + return 0 + + + def num_targets(self): + """ + Get the number of distinct targets. + + Returns (int): + Number of unique targets with spectra in this object. + """ + if self.fibermap is not None: + return len(np.unique(self.fibermap["TARGETID"])) + else: + return 0 + def __add__(self, other): ''' concatenate two SpectraLite objects into one diff --git a/py/desispec/scripts/coadd_spectra.py b/py/desispec/scripts/coadd_spectra.py index 0d118c606..4c0c92fdb 100644 --- a/py/desispec/scripts/coadd_spectra.py +++ b/py/desispec/scripts/coadd_spectra.py @@ -145,9 +145,6 @@ def main(args=None): log.info("resampling ...") spectra = resample_spectra_lin_or_log(spectra, log10_step=args.log10_step, wave_min =args.wave_min, wave_max =args.wave_max, fast = args.fast, nproc = args.nproc) - #- Add scores (S/N, flux, etc.) - compute_coadd_scores(spectra, update_coadd=True) - #- Add input files to header if spectra.meta is None: spectra.meta = dict() diff --git a/py/desispec/scripts/procexp.py b/py/desispec/scripts/procexp.py index 64554dc61..8ca8ddfea 100644 --- a/py/desispec/scripts/procexp.py +++ b/py/desispec/scripts/procexp.py @@ -48,7 +48,9 @@ def parse(options=None): help = 'Do not compute template SNR') parser.add_argument('--no-xtalk', action='store_true', help = 'Do not apply fiber crosstalk correction') - + parser.add_argument('--alpha_only', action='store_true', + help = 'Only compute alpha of tsnr calc.') + args = None if options is None: args = parser.parse_args() @@ -147,7 +149,7 @@ def main(args): if not args.no_tsnr: log.info("calculating tsnr") - results, alpha = calc_tsnr2(uncalibrated_frame, fiberflat=fiberflat, skymodel=skymodel, fluxcalib=fluxcalib) + results, alpha = calc_tsnr2(uncalibrated_frame, fiberflat=fiberflat, skymodel=skymodel, fluxcalib=fluxcalib, alpha_only=args.alpha_only) frame.meta['TSNRALPH'] = alpha diff --git a/py/desispec/scripts/submit_night.py b/py/desispec/scripts/submit_night.py index 9df1ce244..17ecb778e 100644 --- a/py/desispec/scripts/submit_night.py +++ b/py/desispec/scripts/submit_night.py @@ -15,11 +15,11 @@ from desispec.workflow.procfuncs import parse_previous_tables, get_type_and_tile, \ define_and_assign_dependency, create_and_submit, checkfor_and_submit_joint_job from desispec.workflow.queue import update_from_queue +from desispec.workflow.desi_proc_funcs import get_desi_proc_batch_file_path - -def submit_night(night, proc_obstypes=None, dry_run=False, queue='realtime', +def submit_night(night, proc_obstypes=None, dry_run=False, queue='realtime', reservation=None, exp_table_path=None, proc_table_path=None, tab_filetype='csv', - error_if_not_available=True): + error_if_not_available=True, overwrite_existing=False): """ Creates a processing table and an unprocessed table from a fully populated exposure table and submits those jobs for processing (unless dry_run is set). @@ -33,9 +33,11 @@ def submit_night(night, proc_obstypes=None, dry_run=False, queue='realtime', exp_table_path: str. Full path to where to exposure tables are stored, WITHOUT the monthly directory included. proc_table_path: str. Full path to where to processing tables to be written. queue: str. The name of the queue to submit the jobs to. Default is "realtime". + reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation. tab_filetype: str. The file extension (without the '.') of the exposure and processing tables. - error_if_not_available, bool. Default is True. Raise as error if the required exposure table doesn't exist, + error_if_not_available: bool. Default is True. Raise as error if the required exposure table doesn't exist, otherwise prints an error and returns. + overwrite_existing: bool. True if you want to submit jobs even if scripts already exist. Returns: None. """ @@ -63,6 +65,18 @@ def submit_night(night, proc_obstypes=None, dry_run=False, queue='realtime', name = get_processing_table_name(prodmod=night, extension=tab_filetype) proc_table_pathname = pathjoin(proc_table_path, name) + ## Check if night has already been submitted and don't submit if it has, unless told to with ignore_existing + batchdir = get_desi_proc_batch_file_path(night=night) + if not overwrite_existing: + if os.path.exists(batchdir) and len(os.listdir(batchdir)) > 0: + print(f"ERROR: Batch jobs already exist for night {night} and not given flag "+ + "overwrite_existing. Exiting this night.") + return + elif os.path.exists(proc_table_pathname): + print(f"ERROR: Processing table: {proc_table_pathname} already exists and and not "+ + "given flag overwrite_existing. Exiting this night.") + return + ## Determine where the unprocessed data table will be written unproc_table_pathname = pathjoin(proc_table_path, name.replace('processing', 'unprocessed')) @@ -115,6 +129,7 @@ def submit_night(night, proc_obstypes=None, dry_run=False, queue='realtime', internal_id, dry_run=dry_run, queue=queue, + reservation=reservation, strictly_successful=True) prow = erow_to_prow(erow) @@ -122,7 +137,7 @@ def submit_night(night, proc_obstypes=None, dry_run=False, queue='realtime', internal_id += 1 prow = define_and_assign_dependency(prow, arcjob, flatjob) print(f"\nProcessing: {prow}\n") - prow = create_and_submit(prow, dry_run=dry_run, queue=queue, strictly_successful=True) + prow = create_and_submit(prow, dry_run=dry_run, queue=queue, reservation=reservation, strictly_successful=True) ptable.add_row(prow) ## Note: Assumption here on number of flats @@ -158,6 +173,7 @@ def submit_night(night, proc_obstypes=None, dry_run=False, queue='realtime', last_not_dither, \ internal_id, dry_run=dry_run, queue=queue, + reservation=reservation, strictly_successful=True) ## All jobs now submitted, update information from job queue and save ptable = update_from_queue(ptable, start_time=nersc_start, end_time=nersc_end, dry_run=dry_run) diff --git a/py/desispec/scripts/submit_prod.py b/py/desispec/scripts/submit_prod.py index 3f8f6743e..77930cd47 100644 --- a/py/desispec/scripts/submit_prod.py +++ b/py/desispec/scripts/submit_prod.py @@ -64,7 +64,28 @@ def submit_production(production_yaml, dry_run=False, error_if_not_available=Fal conf = yaml.safe_load(open(production_yaml, 'rb')) specprod = str(conf['name']).lower() specprod = verify_variable_with_environment(var=specprod, var_name='specprod', env_name='SPECPROD') + if 'reservation' in conf: + reservation = str(conf['reservation']) + if reservation.lower() == 'none': + reservation = None + else: + reservation = None + if 'queue' in conf: + queue = conf['queue'] + else: + queue = 'realtime' + + if 'OVERWRITEEXISTING' in conf: + overwrite_existing = conf['OVERWRITEEXISTING'] + else: + overwrite_existing = False + print(f'Using queue: {queue}') + if reservation is not None: + print(f'Using reservation: {reservation}') + if overwrite_existing: + print("Ignoring the fact that files exists and submitting those nights anyway") + all_nights = get_all_nights() non_survey_nights = [] for night in all_nights: @@ -80,8 +101,8 @@ def submit_production(production_yaml, dry_run=False, error_if_not_available=Fal continue else: print(f'Processing {survey} night: {night}') - submit_night(night, proc_obstypes=None, dry_run=dry_run, queue=conf['queue'], - error_if_not_available=error_if_not_available) + submit_night(night, proc_obstypes=None, dry_run=dry_run, queue=queue, reservation=reservation, + overwrite_existing=overwrite_existing, error_if_not_available=error_if_not_available) print(f"Completed {night}. Sleeping for 30s") time.sleep(30) diff --git a/py/desispec/sky.py b/py/desispec/sky.py index 7b60710d2..1b5977f5b 100644 --- a/py/desispec/sky.py +++ b/py/desispec/sky.py @@ -234,11 +234,11 @@ def compute_uniform_sky(frame, nsig_clipping=4.,max_iterations=100,model_ivar=Fa # B = sum_fiber sum_wave_w sqrt(ivar)[fiber,w]*flux[fiber,w] sqrtwR[fiber,wave] #A=scipy.sparse.lil_matrix((nwave,nwave)).tocsr() - A=np.zeros((nwave,nwave)) + A=scipy.sparse.csr_matrix((nwave,nwave)) B=np.zeros((nwave)) # diagonal sparse matrix with content = sqrt(ivar)*flat of a given fiber - SD=scipy.sparse.lil_matrix((nwave,nwave)) + SD=scipy.sparse.dia_matrix((nwave,nwave)) # loop on fiber to handle resolution for fiber in range(nfibers) : @@ -250,8 +250,9 @@ def compute_uniform_sky(frame, nsig_clipping=4.,max_iterations=100,model_ivar=Fa SD.setdiag(sqrtw[fiber]) sqrtwR = SD*R # each row r of R is multiplied by sqrtw[r] - A += (sqrtwR.T*sqrtwR).todense() + A += sqrtwR.T*sqrtwR B += sqrtwR.T*sqrtwflux[fiber] + A = A.toarray() log.info("iter %d solving"%iteration) w = A.diagonal()>0 diff --git a/py/desispec/specscore.py b/py/desispec/specscore.py index cf69676a1..729b064dd 100644 --- a/py/desispec/specscore.py +++ b/py/desispec/specscore.py @@ -19,7 +19,7 @@ def _auto_detect_camera(frame) : elif mwave>=tophat_wave["z"][0] : return "z" else : return "r" -def compute_coadd_scores(coadd, update_coadd=True): +def compute_coadd_scores(coadd, specscores=None, update_coadd=True): """Compute scores for a coadded Spectra object Args: @@ -27,11 +27,19 @@ def compute_coadd_scores(coadd, update_coadd=True): Options: update_coadd: if True, update coadd.scores + specscores: scores Table from the uncoadded spectra including a TARGETID column Returns tuple of dictionaries (scores, comments); see compute_frame_scores + + ``specscores`` is used to update TSNR2 scores by summing inputs """ + log = get_logger() scores = dict() comments = dict() + + scores['TARGETID'] = coadd.target_ids() + comments['TARGETID'] = 'DESI Unique Target ID' + if coadd.bands == ['brz']: #- i.e. this is a coadd across cameras fr = Frame(coadd.wave['brz'], coadd.flux['brz'], coadd.ivar['brz'], @@ -54,6 +62,44 @@ def compute_coadd_scores(coadd, update_coadd=True): scores.update(bandscores) comments.update(bandcomments) + if specscores is not None: + #- Derive which TSNR2_XYZ_[BRZ] columns exist + tsnrkeys = list() + tsnrtypes = list() + num_targets = coadd.num_targets() + for colname in specscores.dtype.names: + if colname.startswith('TSNR2_'): + _, targtype, band = colname.split('_') + scores[colname] = np.zeros(num_targets) + comments[colname] = f'{targtype} {band} template (S/N)^2' + tsnrkeys.append(colname) + + if targtype not in tsnrtypes: + tsnrtypes.append(targtype) + + if len(tsnrkeys) == 0: + log.warning('No TSNR2_* scores found to coadd') + else: + #- Add TSNR2_*_B/R/Z columns summed across exposures + for i, tid in enumerate(coadd.target_ids()): + jj = specscores['TARGETID'] == tid + for colname in tsnrkeys: + scores[colname][i] = np.sum(specscores[colname][jj]) + + #- Additionally sum across B/R/Z + for targtype in tsnrtypes: + col = f'TSNR2_{targtype}' + scores[col] = np.zeros(num_targets) + comments[col] = f'{targtype} template (S/N)^2 summed over B,R,Z' + for band in ['B', 'R', 'Z']: + colbrz = f'TSNR2_{targtype}_{band}' + scores[col] += scores[colbrz] + + #- convert to float32 + for col in scores.keys(): + if scores[col].dtype == np.float64: + scores[col] = scores[col].astype(np.float32) + if update_coadd: if hasattr(coadd, 'scores') and coadd.scores is not None: for key in scores: diff --git a/py/desispec/tsnr.py b/py/desispec/tsnr.py index 169ce9c27..2c3445824 100644 --- a/py/desispec/tsnr.py +++ b/py/desispec/tsnr.py @@ -11,8 +11,14 @@ from desispec.calibfinder import findcalibfile from desiutil.log import get_logger from scipy.optimize import minimize +from desiutil.dust import ext_odonnell +def dust_transmission(wave,ebv): + Rv = 3.1 + extinction = ext_odonnell(wave,Rv=Rv) + return 10**(-Rv*ebv[:,None]*extinction[None,:]/2.5) + def get_ensemble(dirpath, bands, smooth=125): ''' Function that takes a frame object and a bitmask and @@ -21,15 +27,15 @@ def get_ensemble(dirpath, bands, smooth=125): 0 in ivar and optionally flips a bit in mask. Args: - dirpath: path to the dir. with ensemble dflux files. + dirpath: path to the dir. with ensemble dflux files. bands: bands to expect, typically [BRZ] - case ignored. Options: - smooth: Further convolve the residual ensemble flux. + smooth: Further convolve the residual ensemble flux. Returns: Dictionary with keys labelling each tracer (bgs, lrg, etc.) for which each value - is a Spectra class instance with wave, flux for BRZ arms. Note flux is the high + is a Spectra class instance with wave, flux for BRZ arms. Note flux is the high frequency residual for the ensemble. See doc. 4723. ''' paths = glob.glob(dirpath + '/tsnr-ensemble-*.fits') @@ -55,24 +61,24 @@ def get_ensemble(dirpath, bands, smooth=125): if smooth > 0: flux[band] = convolve(flux[band][0,:], Box1DKernel(smooth), boundary='extend') flux[band] = flux[band].reshape(1, len(flux[band])) - + ensembles[tracer] = Spectra(bands, wave, flux, ivar) - + return ensembles def read_nea(path): ''' - Read a master noise equivalent area [sq. pixel] file. + Read a master noise equivalent area [sq. pixel] file. input: path: path to a master nea file for a given camera, e.g. b0. - + returns: nea: 2D split object to be evaluated at (fiber, wavelength) angperpix: 2D split object to be evaluated at (fiber, wavelength), - yielding angstrom per pixel. + yielding angstrom per pixel. ''' - + with fits.open(path, memmap=False) as fx: wave=fx['WAVELENGTH'].data angperpix=fx['ANGPERPIX'].data @@ -87,19 +93,19 @@ def read_nea(path): def fb_rdnoise(fibers, frame, psf): ''' - Approximate the readnoise for a given fiber (on a given camera) for the + Approximate the readnoise for a given fiber (on a given camera) for the wavelengths present in frame. wave. input: - fibers: e.g. np.arange(500) to index fiber. + fibers: e.g. np.arange(500) to index fiber. frame: frame instance for the given camera. - psf: corresponding psf instance. + psf: corresponding psf instance. returns: - rdnoise: (nfiber x nwave) array with the estimated readnosie. Same + rdnoise: (nfiber x nwave) array with the estimated readnosie. Same units as OBSRDNA, e.g. ang per pix. ''' - + ccdsizes = np.array(frame.meta['CCDSIZE'].split(',')).astype(np.float) xtrans = ccdsizes[0] / 2. @@ -125,22 +131,22 @@ def fb_rdnoise(fibers, frame, psf): def var_model(rdnoise_sigma, npix_1d, angperpix, angperspecbin, fiberflat, skymodel, alpha=1.0, components=False): ''' - Evaluate a model for the 1D spectral flux variance, e.g. quadrature sum of readnoise and sky components. + Evaluate a model for the 1D spectral flux variance, e.g. quadrature sum of readnoise and sky components. input: rdnoise_sigma: - npix_1d: equivalent to (1D) nea. + npix_1d: equivalent to (1D) nea. angperpix: Angstroms per pixel. - angperspecbin: Angstroms per bin. + angperspecbin: Angstroms per bin. fiberflat: fiberflat instance skymodel: Sky instance. - alpha: empirical weighting of the rdnoise term to e.g. better fit sky fibers per exp. cam. - components: if True, return tuple of individual contributions to the variance. Else return variance. + alpha: empirical weighting of the rdnoise term to e.g. better fit sky fibers per exp. cam. + components: if True, return tuple of individual contributions to the variance. Else return variance. - returns: - nfiber x nwave array of the expected variance. + returns: + nfiber x nwave array of the expected variance. ''' - + # the extraction is performed with a wavelength bin of width = angperspecbin # so the effective number of CCD pixels corresponding to a spectral bin width is npix_2d = npix_1d * (angperspecbin / angperpix) @@ -162,6 +168,56 @@ def var_model(rdnoise_sigma, npix_1d, angperpix, angperspecbin, fiberflat, skymo else: return alpha * rdnoise_variance + fiberflat.fiberflat * np.abs(skymodel.flux) +def gen_mask(frame, skymodel, hw=5.): + """ + Generate a mask for the alpha computation, masking out bright sky lines. + Args: + frame : uncalibrated Frame object for one camera + skymodel : SkyModel object + hw : (optional) float, half width of mask around sky lines in A + Returns an array of same shape as frame, here mask=1 is good, 0 is bad + """ + log = get_logger() + + maskfactor = np.ones_like(frame.mask, dtype=np.float) + maskfactor[frame.mask > 0] = 0.0 + + # https://github.com/desihub/desispec/blob/294cfb66428aa8be3797fd046adbd0a2267c4409/py/desispec/sky.py#L1267 + skyline=np.array([5199.4,5578.4,5656.4,5891.4,5897.4,6302.4,6308.4,6365.4,6500.4,6546.4,\ + 6555.4,6618.4,6663.4,6679.4,6690.4,6765.4,6831.4,6836.4,6865.4,6925.4,\ + 6951.4,6980.4,7242.4,7247.4,7278.4,7286.4,7305.4,7318.4,7331.4,7343.4,\ + 7360.4,7371.4,7394.4,7404.4,7440.4,7526.4,7714.4,7719.4,7752.4,7762.4,\ + 7782.4,7796.4,7810.4,7823.4,7843.4,7855.4,7862.4,7873.4,7881.4,7892.4,\ + 7915.4,7923.4,7933.4,7951.4,7966.4,7982.4,7995.4,8016.4,8028.4,8064.4,\ + 8280.4,8284.4,8290.4,8298.4,8301.4,8313.4,8346.4,8355.4,8367.4,8384.4,\ + 8401.4,8417.4,8432.4,8454.4,8467.4,8495.4,8507.4,8627.4,8630.4,8634.4,\ + 8638.4,8652.4,8657.4,8662.4,8667.4,8672.4,8677.4,8683.4,8763.4,8770.4,\ + 8780.4,8793.4,8829.4,8835.4,8838.4,8852.4,8870.4,8888.4,8905.4,8922.4,\ + 8945.4,8960.4,8990.4,9003.4,9040.4,9052.4,9105.4,9227.4,9309.4,9315.4,\ + 9320.4,9326.4,9340.4,9378.4,9389.4,9404.4,9422.4,9442.4,9461.4,9479.4,\ + 9505.4,9521.4,9555.4,9570.4,9610.4,9623.4,9671.4,9684.4,9693.4,9702.4,\ + 9714.4,9722.4,9740.4,9748.4,9793.4,9802.4,9814.4,9820.4]) + + maskfactor *= (skymodel.ivar > 0.0) + maskfactor *= (frame.ivar > 0.0) + + if hw > 0.0: + log.info('TSNR Masking bright lines in alpha calc. (half width: {:.3f})'.format(hw)) + + for line in skyline : + if line<=frame.wave[0] or line>=frame.wave[-1]: + continue + + ii=np.where((frame.wave>=line-hw)&(frame.wave<=line+hw))[0] + + maskfactor[:,ii]=0.0 + + # Mask collimator, [4300-4500A] + ii=np.where((frame.wave>=4300.)&(frame.wave<=4500.))[0] + maskfactor[:,ii]=0.0 + + return maskfactor + def calc_alpha(frame, fibermap, rdnoise_sigma, npix_1d, angperpix, angperspecbin, fiberflat, skymodel): ''' Model Var = alpha * rdnoise component + sky. @@ -172,51 +228,53 @@ def calc_alpha(frame, fibermap, rdnoise_sigma, npix_1d, angperpix, angperspecbin input: frame: desispec frame instance (should be uncalibrated, i.e. e/A). fibermap: desispec fibermap instance. - rdnoise_sigma: e.g. RDNOISE value per Quadrant (float). + rdnoise_sigma: e.g. RDNOISE value per Quadrant (float). npix_1d: equivalent to 1D nea [pixels], calculated using read_nea(). angperpix: angstroms per pixel (float), fiberflat: desispec fiberflat instance. skymodel: desispec Sky instance. alpha: nuisanve parameter to reweight rdnoise vs sky contribution to variance (float). - components: if True, return individual contributions to variance, else return total variance. + components: if True, return individual contributions to variance, else return total variance. returns: - alpha: nuisance parameter to reweight rdnoise vs sky contribution to variance (float), obtained + alpha: nuisance parameter to reweight rdnoise vs sky contribution to variance (float), obtained as the best fit to the uncalibrated sky fibers VAR. ''' log = get_logger() sky_indx = np.where(fibermap['OBJTYPE'] == 'SKY')[0] rd_var, sky_var = var_model(rdnoise_sigma, npix_1d, angperpix, angperspecbin, fiberflat, skymodel, alpha=1.0, components=True) - maskfactor = np.ones_like(frame.mask[sky_indx,:], dtype=np.float) - maskfactor[frame.mask[sky_indx,:] > 0] = 0.0 - + maskfactor = gen_mask(frame, skymodel) + maskfactor = maskfactor[sky_indx,:] + def calc_alphavar(alpha): return alpha * rd_var[sky_indx,:] + sky_var[sky_indx,:] def alpha_X2(alpha): _var = calc_alphavar(alpha) _ivar = 1. / _var - X2 = (frame.ivar[sky_indx,:] - _ivar)**2. + X2 = np.abs(frame.ivar[sky_indx,:] - _ivar) + return np.sum(X2 * maskfactor) res = minimize(alpha_X2, x0=[1.]) alpha = res.x[0] - if 0.8 < alpha < 1.0: - log.warning(f'tSNR alpha {alpha:.4f} < 1.0') - elif alpha <= 0.8: - msg = f'tSNR alpha {alpha:.4f} < 0.8' - log.error(msg) - raise ValueError(msg) - + #- From JG PR #1164: + # Noisy values of alpha can occur for observations dominated by sky noise + # where it is not possible to calibrated the read noise. For those + # exposures, the precise value of alpha does not impact the SNR estimation. + if alpha < 0.8 : + log.warning(f'tSNR forcing best fit alpha = {alpha:.4f} to 0.8') + alpha = 0.8 + return alpha #- Cache files from desimodel to avoid reading them N>>1 times _camera_nea_angperpix = None _band_ensemble = None -def calc_tsnr2(frame, fiberflat, skymodel, fluxcalib) : +def calc_tsnr2(frame, fiberflat, skymodel, fluxcalib, alpha_only=False) : ''' Compute template SNR^2 values for a given frame @@ -231,11 +289,11 @@ def calc_tsnr2(frame, fiberflat, skymodel, fluxcalib) : holding nfiber length array of the tsnr^2 values for this camera, and `alpha`, the relative weighting btwn rdnoise & sky terms to model var. - Note: Assumes DESIMODEL is set and up to date. + Note: Assumes DESIMODEL is set and up to date. ''' global _camera_nea_angperpix global _band_ensemble - + log=get_logger() if not (frame.meta["BUNIT"]=="count/Angstrom" or frame.meta["BUNIT"]=="electron/Angstrom" ) : @@ -282,6 +340,14 @@ def calc_tsnr2(frame, fiberflat, skymodel, fluxcalib) : fibers = np.arange(nspec) rdnoise = fb_rdnoise(fibers, frame, psf) + # + ebv = frame.fibermap['EBV'] + + if np.sum(ebv!=0)>0 : + log.info("TSNR MEDIAN EBV = {:.3f}".format(np.median(ebv[ebv!=0]))) + else : + log.info("TSNR MEDIAN EBV = 0") + # Evaluate. npix = nea(fibers, frame.wave) angperpix = angperpix(fibers, frame.wave) @@ -291,28 +357,24 @@ def calc_tsnr2(frame, fiberflat, skymodel, fluxcalib) : log.info('{} \t {:.3f} +- {:.3f}'.format(label.ljust(10), np.median(x), np.std(x))) # Relative weighting between rdnoise & sky terms to model var. - try: - alpha = calc_alpha(frame, fibermap=frame.fibermap, - rdnoise_sigma=rdnoise, npix_1d=npix, - angperpix=angperpix, angperspecbin=angperspecbin, - fiberflat=fiberflat, skymodel=skymodel) - log.info(f"TSNR ALPHA = {alpha:.3f}") - except ValueError: - log.error(f'Bad alpha={alpha:.4f} value; setting tSNR=0.0') - results=dict() - for tracer in ensemble.keys(): - key = 'TSNR2_{}_{}'.format(tracer.upper(), band.upper()) - results[key]=np.zeros(nspec) - - return results, alpha + alpha = calc_alpha(frame, fibermap=frame.fibermap, + rdnoise_sigma=rdnoise, npix_1d=npix, + angperpix=angperpix, angperspecbin=angperspecbin, + fiberflat=fiberflat, skymodel=skymodel) + + log.info(f"TSNR ALPHA = {alpha:.6f}") + + if alpha_only: + return {}, alpha maskfactor = np.ones_like(frame.mask, dtype=np.float) maskfactor[frame.mask > 0] = 0.0 - + maskfactor *= (frame.ivar > 0.0) + tsnrs = {} denom = var_model(rdnoise, npix, angperpix, angperspecbin, fiberflat, skymodel, alpha=alpha) - + for tracer in ensemble.keys(): wave = ensemble[tracer].wave[band] dflux = ensemble[tracer].flux[band] @@ -332,10 +394,14 @@ def calc_tsnr2(frame, fiberflat, skymodel, fluxcalib) : # Wavelength dependent fiber flat; Multiply or divide - check with Julien. result = dflux * fiberflat.fiberflat + + # Apply dust transmission. + result *= dust_transmission(frame.wave, ebv) + result = result**2. result /= denom - + # Eqn. (1) of https://desi.lbl.gov/DocDB/cgi-bin/private/RetrieveFile?docid=4723;filename=sky-monitor-mc-study-v1.pdf;version=2 tsnrs[tracer] = np.sum(result * maskfactor, axis=1) diff --git a/py/desispec/workflow/procfuncs.py b/py/desispec/workflow/procfuncs.py index adc730078..0fb8e67d8 100644 --- a/py/desispec/workflow/procfuncs.py +++ b/py/desispec/workflow/procfuncs.py @@ -69,7 +69,7 @@ def batch_script_name(prow): scriptfile = pathname + '.slurm' return scriptfile -def create_and_submit(prow, queue='realtime', dry_run=False, joint=False, strictly_successful=False): +def create_and_submit(prow, queue='realtime', reservation=None, dry_run=False, joint=False, strictly_successful=False): """ Wrapper script that takes a processing table row and three modifier keywords, creates a submission script for the compute nodes, and then submits that script to the Slurm scheduler with appropriate dependencies. @@ -78,6 +78,7 @@ def create_and_submit(prow, queue='realtime', dry_run=False, joint=False, strict prow, Table.Row or dict. Must include keyword accessible definitions for processing_table columns found in desispect.workflow.proctable.get_processing_table_column_defs() queue, str. The name of the NERSC Slurm queue to submit to. Default is the realtime queue. + reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation. dry_run, bool. If true, this is a simulated run and the scripts will not be written or submitted. Output will relevant for testing will be printed as though scripts are being submitted. Default is False. joint, bool. Whether this is a joint fitting job (the job involves multiple exposures) and therefore needs to be @@ -96,7 +97,7 @@ def create_and_submit(prow, queue='realtime', dry_run=False, joint=False, strict not change during the execution of this function (but can be overwritten explicitly with the returned row if desired). """ prow = create_batch_script(prow, queue=queue, dry_run=dry_run, joint=joint) - prow = submit_batch_script(prow, dry_run=dry_run, strictly_successful=strictly_successful) + prow = submit_batch_script(prow, reservation=reservation, dry_run=dry_run, strictly_successful=strictly_successful) return prow def desi_proc_command(prow, queue=None): @@ -207,7 +208,7 @@ def create_batch_script(prow, queue='realtime', dry_run=False, joint=False): return prow -def submit_batch_script(prow, dry_run=False, strictly_successful=False): +def submit_batch_script(prow, dry_run=False, reservation=None, strictly_successful=False): """ Wrapper script that takes a processing table row and three modifier keywords and submits the scripts to the Slurm scheduler. @@ -217,6 +218,7 @@ def submit_batch_script(prow, dry_run=False, strictly_successful=False): desispect.workflow.proctable.get_processing_table_column_defs() dry_run, bool. If true, this is a simulated run and the scripts will not be written or submitted. Output will relevant for testing will be printed as though scripts are being submitted. Default is False. + reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation. strictly_successful, bool. Whether all jobs require all inputs to have succeeded. For daily processing, this is less desirable because e.g. the sciences can run with SVN default calibrations rather than failing completely from failed calibrations. Default is False. @@ -234,6 +236,7 @@ def submit_batch_script(prow, dry_run=False, strictly_successful=False): jobname = batch_script_name(prow) dependencies = prow['LATEST_DEP_QID'] dep_list, dep_str = '', '' + if dependencies is not None: jobtype = prow['JOBDESC'] if strictly_successful: @@ -270,15 +273,18 @@ def submit_batch_script(prow, dry_run=False, strictly_successful=False): # script_path = pathjoin(batchdir, script) batchdir = get_desi_proc_batch_file_path(night=prow['NIGHT']) script_path = pathjoin(batchdir, jobname) - if dep_str == '': - current_qid = subprocess.check_output(['sbatch', '--parsable', f'{script_path}'], - stderr=subprocess.STDOUT, text=True) - else: - current_qid = subprocess.check_output(['sbatch', '--parsable',f'{dep_str}',f'{script_path}'], - stderr=subprocess.STDOUT, text=True) + batch_params = ['sbatch', '--parsable'] + if dep_str != '': + batch_params.append(f'{dep_str}') + if reservation is not None: + batch_params.append(f'--reservation={reservation}') + batch_params.append(f'{script_path}') + + current_qid = subprocess.check_output(batch_params, stderr=subprocess.STDOUT, text=True) current_qid = int(current_qid.strip(' \t\n')) - log.info(f'Submitted {jobname} with dependencies {dep_str}. Returned qid: {current_qid}') + log.info(batch_params) + log.info(f'Submitted {jobname} with dependencies {dep_str} and reservation={reservation}. Returned qid: {current_qid}') prow['LATEST_QID'] = current_qid prow['ALL_QIDS'] = np.append(prow['ALL_QIDS'],current_qid) @@ -302,7 +308,7 @@ def define_and_assign_dependency(prow, arcjob, flatjob): arcjob, Table.Row, dict, or NoneType. Processing row corresponding to psfnight for the night of the data in prow. This must contain keyword accessible values for 'INTID', and 'LATEST_QID'. If None, it assumes the dependency doesn't exist and no dependency is assigned. - flatkpb, Table.Row, dict, or NoneType. Processing row corresponding to nightlyflat for the night of the data in prow. + flatjob, Table.Row, dict, or NoneType. Processing row corresponding to nightlyflat for the night of the data in prow. This must contain keyword accessible values for 'INTID', and 'LATEST_QID'. If None, it assumes the dependency doesn't exist and no dependency is assigned. @@ -476,7 +482,7 @@ def parse_previous_tables(etable, ptable, night): def update_and_recurvsively_submit(proc_table, submits=0, resubmission_states=None, start_time=None, end_time=None, - ptab_name=None, dry_run=False): + ptab_name=None, dry_run=False,reservation=None): """ Given an processing table, this loops over job rows and resubmits failed jobs (as defined by resubmission_states). Before submitting a job, it checks the dependencies for failures. If a dependency needs to be resubmitted, it recursively @@ -498,6 +504,7 @@ def update_and_recurvsively_submit(proc_table, submits=0, resubmission_states=No ptab_name, str, the full pathname where the processing table should be saved. dry_run, bool, whether this is a simulated run or not. If True, jobs are not actually submitted but relevant information is printed to help with testing. + reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation. Returns: proc_table: Table, a table with the same rows as the input except that Slurm and jobid relevant columns have been updated for those jobs that needed to be resubmitted. @@ -515,11 +522,11 @@ def update_and_recurvsively_submit(proc_table, submits=0, resubmission_states=No if proc_table['STATUS'][rown] in resubmission_states: proc_table, submits = recursive_submit_failed(rown, proc_table, \ submits, id_to_row_map, ptab_name, - resubmission_states, dry_run) + resubmission_states, reservation, dry_run) return proc_table, submits def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name=None, - resubmission_states=None, dry_run=False): + resubmission_states=None, reservation=None, dry_run=False): """ Given a row of a processing table and the full processing table, this resubmits the given job. Before submitting a job, it checks the dependencies for failures in the processing table. If a dependency needs to @@ -536,6 +543,7 @@ def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name= resubmission_states, list or array of strings, each element should be a capitalized string corresponding to a possible Slurm scheduler state, where you wish for jobs with that outcome to be resubmitted + reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation. dry_run, bool, whether this is a simulated run or not. If True, jobs are not actually submitted but relevant information is printed to help with testing. Returns: @@ -557,8 +565,8 @@ def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name= qdeps = [] for idep in np.sort(np.atleast_1d(ideps)): if proc_table['STATUS'][id_to_row_map[idep]] in resubmission_states: - proc_table, submits = recursive_submit_failed(id_to_row_map[idep], \ - proc_table, submits, id_to_row_map) + proc_table, submits = recursive_submit_failed(id_to_row_map[idep], proc_table, submits, + id_to_row_map, reservation=reservation, dry_run=dry_run) qdeps.append(proc_table['LATEST_QID'][id_to_row_map[idep]]) qdeps = np.atleast_1d(qdeps) @@ -568,7 +576,7 @@ def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name= log.info("Error: number of qdeps should be 1 or more") log.info(f'Rown {rown}, ideps {ideps}') - proc_table[rown] = submit_batch_script(proc_table[rown], dry_run=dry_run) + proc_table[rown] = submit_batch_script(proc_table[rown], reservation=reservation, dry_run=dry_run) submits += 1 if dry_run: @@ -595,7 +603,7 @@ def recursive_submit_failed(rown, proc_table, submits, id_to_row_map, ptab_name= ######################################### ######## Joint fit ############## ######################################### -def joint_fit(ptable, prows, internal_id, queue, descriptor, dry_run=False, strictly_successful=False): +def joint_fit(ptable, prows, internal_id, queue, reservation, descriptor, dry_run=False, strictly_successful=False): """ Given a set of prows, this generates a processing table row, creates a batch script, and submits the appropriate joint fitting job given by descriptor. If the joint fitting job is standard star fitting, the post standard star fits @@ -608,6 +616,7 @@ def joint_fit(ptable, prows, internal_id, queue, descriptor, dry_run=False, stri inputs to the joint fit. internal_id, int, the next internal id to be used for assignment (already incremented up from the last used id number used). queue, str. The name of the queue to submit the jobs to. If None is given the current desi_proc default is used. + reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation. descriptor, str. Description of the joint fitting job. Can either be 'science' or 'stdstarfit', 'arc' or 'psfnight', or 'flat' or 'nightlyflat'. dry_run, bool, whether this is a simulated run or not. If True, jobs are not actually submitted but relevant @@ -642,7 +651,8 @@ def joint_fit(ptable, prows, internal_id, queue, descriptor, dry_run=False, stri log.info(f"Joint fit criteria found. Running {descriptor}.\n") joint_prow = make_joint_prow(prows, descriptor=descriptor, internal_id=internal_id) internal_id += 1 - joint_prow = create_and_submit(joint_prow, queue=queue, joint=True, dry_run=dry_run, strictly_successful=strictly_successful) + joint_prow = create_and_submit(joint_prow, queue=queue, reservation=reservation, joint=True, dry_run=dry_run, + strictly_successful=strictly_successful) ptable.add_row(joint_prow) if descriptor == 'stdstarfit': @@ -659,7 +669,8 @@ def joint_fit(ptable, prows, internal_id, queue, descriptor, dry_run=False, stri row['ALL_QIDS'] = np.ndarray(shape=0).astype(int) internal_id += 1 row = assign_dependency(row, joint_prow) - row = create_and_submit(row, queue=queue, dry_run=dry_run, strictly_successful=strictly_successful) + row = create_and_submit(row, queue=queue, reservation=reservation, dry_run=dry_run, + strictly_successful=strictly_successful) ptable.add_row(row) else: ## in dry_run, mock Slurm ID's are generated using CPU seconds. Wait one second so we have unique ID's @@ -672,7 +683,8 @@ def joint_fit(ptable, prows, internal_id, queue, descriptor, dry_run=False, stri ## wrapper functions for joint fitting -def science_joint_fit(ptable, sciences, internal_id, queue='realtime', dry_run=False, strictly_successful=False): +def science_joint_fit(ptable, sciences, internal_id, queue='realtime', + reservation=None, dry_run=False, strictly_successful=False): """ Wrapper function for desiproc.workflow.procfuns.joint_fit specific to the stdstarfit joint fit. @@ -680,11 +692,12 @@ def science_joint_fit(ptable, sciences, internal_id, queue='realtime', dry_run=F Arg 'sciences' is mapped to the prows argument of joint_fit. The joint_fit argument descriptor is pre-defined as 'stdstarfit'. """ - return joint_fit(ptable=ptable, prows=sciences, internal_id=internal_id, queue=queue, descriptor='stdstarfit', - dry_run=dry_run, strictly_successful=strictly_successful) + return joint_fit(ptable=ptable, prows=sciences, internal_id=internal_id, queue=queue, reservation=reservation, + descriptor='stdstarfit', dry_run=dry_run, strictly_successful=strictly_successful) -def flat_joint_fit(ptable, flats, internal_id, queue='realtime', dry_run=False, strictly_successful=False): +def flat_joint_fit(ptable, flats, internal_id, queue='realtime', + reservation=None, dry_run=False, strictly_successful=False): """ Wrapper function for desiproc.workflow.procfuns.joint_fit specific to the nightlyflat joint fit. @@ -692,11 +705,12 @@ def flat_joint_fit(ptable, flats, internal_id, queue='realtime', dry_run=False, Arg 'flats' is mapped to the prows argument of joint_fit. The joint_fit argument descriptor is pre-defined as 'nightlyflat'. """ - return joint_fit(ptable=ptable, prows=flats, internal_id=internal_id, queue=queue, descriptor='nightlyflat', - dry_run=dry_run, strictly_successful=strictly_successful) + return joint_fit(ptable=ptable, prows=flats, internal_id=internal_id, queue=queue, reservation=reservation, + descriptor='nightlyflat', dry_run=dry_run, strictly_successful=strictly_successful) -def arc_joint_fit(ptable, arcs, internal_id, queue='realtime', dry_run=False, strictly_successful=False): +def arc_joint_fit(ptable, arcs, internal_id, queue='realtime', + reservation=None, dry_run=False, strictly_successful=False): """ Wrapper function for desiproc.workflow.procfuns.joint_fit specific to the psfnight joint fit. @@ -704,8 +718,8 @@ def arc_joint_fit(ptable, arcs, internal_id, queue='realtime', dry_run=False, st Arg 'arcs' is mapped to the prows argument of joint_fit. The joint_fit argument descriptor is pre-defined as 'psfnight'. """ - return joint_fit(ptable=ptable, prows=arcs, internal_id=internal_id, queue=queue, descriptor='psfnight', - dry_run=dry_run, strictly_successful=strictly_successful) + return joint_fit(ptable=ptable, prows=arcs, internal_id=internal_id, queue=queue, reservation=reservation, + descriptor='psfnight', dry_run=dry_run, strictly_successful=strictly_successful) def make_joint_prow(prows, descriptor, internal_id): @@ -751,7 +765,7 @@ def make_joint_prow(prows, descriptor, internal_id): def checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, arcjob, flatjob, \ lasttype, last_not_dither, internal_id, dry_run=False, - queue='realtime', strictly_successful=False): + queue='realtime', reservation=None, strictly_successful=False): """ Takes all the state-ful data from daily processing and determines whether a joint fit needs to be submitted. Places the decision criteria into a single function for easier maintainability over time. These are separate from the @@ -775,6 +789,7 @@ def checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, arcjob, flatjob dry_run, bool, whether this is a simulated run or not. If True, jobs are not actually submitted but relevant information is printed to help with testing. queue, str. The name of the queue to submit the jobs to. If None is given the current desi_proc default is used. + reservation: str. The reservation to submit jobs to. If None, it is not submitted to a reservation. strictly_successful, bool. Whether all jobs require all inputs to have succeeded. For daily processing, this is less desirable because e.g. the sciences can run with SVN default calibrations rather than failing completely from failed calibrations. Default is False. @@ -789,19 +804,19 @@ def checkfor_and_submit_joint_job(ptable, arcs, flats, sciences, arcjob, flatjob from the input such that it represents the smallest unused ID. """ if lasttype == 'science' and last_not_dither: - ptable, tilejob, internal_id = science_joint_fit(ptable, sciences, internal_id, dry_run=dry_run, - queue=queue, strictly_successful=strictly_successful) + ptable, tilejob, internal_id = science_joint_fit(ptable, sciences, internal_id, dry_run=dry_run, queue=queue, + reservation=reservation, strictly_successful=strictly_successful) if tilejob is not None: sciences = [] elif lasttype == 'flat' and flatjob is None and len(flats) > 11: ## Note here we have an assumption about the number of expected flats being greater than 10 - ptable, flatjob, internal_id = flat_joint_fit(ptable, flats, internal_id, dry_run=dry_run, - queue=queue, strictly_successful=strictly_successful) + ptable, flatjob, internal_id = flat_joint_fit(ptable, flats, internal_id, dry_run=dry_run, queue=queue, + reservation=reservation, strictly_successful=strictly_successful) internal_id += 1 elif lasttype == 'arc' and arcjob is None and len(arcs) > 4: ## Note here we have an assumption about the number of expected arcs being greater than 4 - ptable, arcjob, internal_id = arc_joint_fit(ptable, arcs, internal_id, dry_run=dry_run, - queue=queue, strictly_successful=strictly_successful) + ptable, arcjob, internal_id = arc_joint_fit(ptable, arcs, internal_id, dry_run=dry_run, queue=queue, + reservation=reservation, strictly_successful=strictly_successful) internal_id += 1 return ptable, arcjob, flatjob, sciences, internal_id diff --git a/py/desispec/workflow/tableio.py b/py/desispec/workflow/tableio.py index 6d763dcef..b4385efe8 100644 --- a/py/desispec/workflow/tableio.py +++ b/py/desispec/workflow/tableio.py @@ -233,8 +233,8 @@ def load_table(tablename=None, tabletype=None, joinsymb='|', verbose=False, proc *.temp.{ext} and then moved to *.{ext}. If None, it looks up the default for typetable. If tabletype is None it uses this to try and identify the tabletype and uses that to get the default column names and types. - tabletype, str. Used if tablename is None to get the default name for the type of table. Also used to get the - column datatypes and defaults. + tabletype, str. 'exptable', 'proctable', or 'unproctable'. Used if tablename is None to get the default name + for the type of table. Also used to get the column datatypes and defaults. joinsymb, str. The symbol used to join values in a list/array when saving. Should not be a comma. verbose, bool. Whether to give verbose amounts of information (True) or succinct/no outputs (False). Default is False. process_mixins, bool. Whether to look for and try to split strings into lists/arrays. The default is True.