Skip to content

Commit

Permalink
update merging code with external catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
moustakas committed Feb 5, 2024
1 parent 05a4a7e commit e30193b
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 30 deletions.
58 changes: 32 additions & 26 deletions bin/mpi-fastspecfit
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,30 @@ def main():
except ImportError:
comm = None

if comm is None:
rank = 0
else:
rank = comm.rank

# https://docs.nersc.gov/development/languages/python/parallel-python/#use-the-spawn-start-method
if args.mp > 1 and 'NERSC_HOST' in os.environ:
import multiprocessing
multiprocessing.set_start_method('spawn')

# check the input samplefile
if rank == 0 and args.samplefile is not None:
import fitsio
from astropy.table import Table
if not os.path.isfile(args.samplefile):
log.warning(f'{args.samplefile} does not exist.')
return
try:
sample = Table(fitsio.read(args.samplefile, columns=['SURVEY', 'PROGRAM', 'HEALPIX', 'TARGETID']))
except:
errmsg = f'Sample file {args.samplefile} missing required columns {SURVEY,PROGRAM,HEALPIX,TARGETID}'
self.log.critical(errmsg)
raise ValueError(errmsg)

if args.samplefile is None and args.coadd_type == 'healpix':
args.survey = args.survey.split(',')
args.program = args.program.split(',')
Expand Down Expand Up @@ -318,33 +337,24 @@ def main():
else:
fastfiles_to_merge = None

merge_fastspecfit(specprod=args.specprod, specprod_dir=specprod_dir, coadd_type=args.coadd_type,
survey=args.survey, program=args.program, healpix=args.healpix,
tile=args.tile, night=args.night, outdir_data=outdir_data,
fastfiles_to_merge=fastfiles_to_merge,
outsuffix=args.merge_suffix, mergedir=args.mergedir, overwrite=args.overwrite,
fastphot=args.fastphot, supermerge=args.mergeall, mp=args.mp)
if args.samplefile is not None:
merge_fastspecfit(specprod=args.specprod, specprod_dir=specprod_dir, coadd_type='healpix',
sample=sample, merge_suffix=args.merge_suffix,
outdir_data=outdir_data, fastfiles_to_merge=fastfiles_to_merge,
outsuffix=args.merge_suffix, mergedir=args.mergedir, overwrite=args.overwrite,
fastphot=args.fastphot, supermerge=args.mergeall, mp=args.mp)
else:
merge_fastspecfit(specprod=args.specprod, specprod_dir=specprod_dir, coadd_type=args.coadd_type,
survey=args.survey, program=args.program, healpix=args.healpix,
tile=args.tile, night=args.night, outdir_data=outdir_data,
fastfiles_to_merge=fastfiles_to_merge,
outsuffix=args.merge_suffix, mergedir=args.mergedir, overwrite=args.overwrite,
fastphot=args.fastphot, supermerge=args.mergeall, mp=args.mp)
return

if args.plan and args.makeqa is False:
if comm is None:
rank = 0
else:
rank = comm.rank
if rank == 0:
if args.samplefile is not None:
import fitsio
from astropy.table import Table
if not os.path.isfile(args.samplefile):
log.warning(f'{args.samplefile} does not exist.')
return
try:
sample = Table(fitsio.read(args.samplefile, columns=['SURVEY', 'PROGRAM', 'HEALPIX', 'TARGETID']))
except:
errmsg = f'Sample file {args.samplefile} missing required columns {SURVEY,PROGRAM,HEALPIX,TARGETID}'
self.log.critical(errmsg)
raise ValueError(errmsg)

plan(comm=comm, specprod=args.specprod, specprod_dir=specprod_dir,
sample=sample, coadd_type='healpix', makeqa=args.makeqa,
mp=args.mp, fastphot=args.fastphot,
Expand All @@ -356,10 +366,6 @@ def main():
makeqa=args.makeqa, mp=args.mp, fastphot=args.fastphot,
outdir_data=outdir_data, overwrite=args.overwrite)
else:
if args.samplefile is not None and not os.path.isfile(args.samplefile):
log.warning(f'{args.samplefile} does not exist.')
return

run_fastspecfit(args, comm=comm, fastphot=args.fastphot, specprod_dir=specprod_dir,
makeqa=args.makeqa, outdir_data=outdir_data, samplefile=args.samplefile,
templates=args.templates, templateversion=args.templateversion)
Expand Down
26 changes: 22 additions & 4 deletions py/fastspecfit/mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,10 @@ def _findfiles(filedir, prefix='redrock', survey=None, program=None, healpix=Non

if merge:
redrockfiles = None
outfiles = _findfiles(outdir, prefix=outprefix, survey=survey, program=program, healpix=healpix, tile=tile, night=night, gzip=gzip)
if sample is not None: # special case of an input catalog
outfiles, _ = _findfiles(outdir, prefix=outprefix, sample=sample)
else:
outfiles = _findfiles(outdir, prefix=outprefix, survey=survey, program=program, healpix=healpix, tile=tile, night=night, gzip=gzip)
log.info(f'Found {len(outfiles)} {outprefix} files to be merged.')
elif makeqa:
redrockfiles = None
Expand Down Expand Up @@ -386,9 +389,9 @@ def _domerge(outfiles, extname='FASTSPEC', survey=None, program=None,
no_smooth_continuum=deps['NOSCORR'])

def merge_fastspecfit(specprod=None, coadd_type=None, survey=None, program=None,
healpix=None, tile=None, night=None, outsuffix=None,
healpix=None, tile=None, night=None, sample=None, outsuffix=None,
fastphot=False, specprod_dir=None, outdir_data='.',
fastfiles_to_merge=None,
fastfiles_to_merge=None, merge_suffix=None,
mergedir=None, supermerge=False, overwrite=False, mp=1):
"""Merge all the individual catalogs into a single large catalog. Runs only on
rank 0.
Expand Down Expand Up @@ -434,7 +437,22 @@ def merge_fastspecfit(specprod=None, coadd_type=None, survey=None, program=None,
log.info(f'No catalogs found: {_outfiles}')
return

if coadd_type == 'healpix':
if sample is not None:
if merge_suffix is None:
merge_suffix = 'sample'
mergefile = os.path.join(mergedir, f'{outprefix}-{specprod}-{merge_suffix}.fits')
if os.path.isfile(mergefile) and not overwrite:
log.info(f'Merged output file {mergefile} exists!')
return

_, _, outfiles, _, _ = plan(specprod=specprod, sample=sample, merge=True,
fastphot=fastphot, specprod_dir=specprod_dir,
outdir_data=outdir_data, overwrite=overwrite)
if len(outfiles) > 0:
_domerge(outfiles, extname=extname, mergefile=mergefile, outprefix=outprefix,
specprod=specprod, coadd_type=coadd_type, fastphot=fastphot, mp=mp)

elif coadd_type == 'healpix' and sample is None:
if survey is None or program is None:
log.warning(f'coadd_type={coadd_type} requires survey and program inputs.')
return
Expand Down

0 comments on commit e30193b

Please sign in to comment.