Skip to content

Commit

Permalink
Merge b69c51a into df637b2
Browse files Browse the repository at this point in the history
  • Loading branch information
Waelthus committed Apr 5, 2022
2 parents df637b2 + b69c51a commit 755db3a
Show file tree
Hide file tree
Showing 12 changed files with 141 additions and 70 deletions.
3 changes: 2 additions & 1 deletion examples/delta_extraction/desi_pk1d_fuji_produce_delta.ini
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ overwrite = False
out dir = $out_fuji_pk1d/Delta_pk1d/
logging level console = DEBUG
logging level file = DEBUG
num processors = 4

[data]
type = DesiHealpix
Expand All @@ -14,6 +15,7 @@ lambda max = 7500.0
lambda min rest frame = 1040.0
lambda max rest frame = 1200.0
delta lambda = 0.8
#delta lambda rest frame= 2.6666666666666
lambda abs IGM = LYA
wave solution = lin
#use non-coadded spectra = True #will use the spectra files, takes way longer for I/O, coadding reasons
Expand All @@ -29,5 +31,4 @@ type = Dr16ExpectedFlux
iter out prefix = delta_attributes
num iterations = 7
use constant weight = True
num processors = 4
order = 0
19 changes: 15 additions & 4 deletions py/picca/delta_extraction/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from datetime import datetime
import git


from picca.delta_extraction.errors import ConfigError
from picca.delta_extraction.utils import class_from_string, setup_logger

Expand All @@ -24,7 +25,7 @@
accepted_data_options = ["type", "module name"]
accepted_expected_flux_options = ["type", "module name"]
accepted_general_options = ["overwrite", "logging level console",
"logging level file", "log", "out dir"]
"logging level file", "log", "out dir", "num processors"]
accepted_masks_options = ["num masks", "type {int}", "module name {int}"]

default_config = {
Expand All @@ -35,6 +36,7 @@
"log": "run.log",
"logging level console": "PROGRESS",
"logging level file": "PROGRESS",
"num processors": 0,
},
"run specs": {
"git hash": git_hash,
Expand Down Expand Up @@ -150,6 +152,7 @@ def __init__(self, filename):
self.__format_data_section()
self.expected_flux = None
self.__format_expected_flux_section()
self.num_processors = None

# initialize folders where data will be saved
self.initialize_folders()
Expand Down Expand Up @@ -291,6 +294,8 @@ def __format_data_section(self):

# add output directory
section["out dir"] = self.out_dir
if "num processors" in accepted_options:
section["num processors"] = self.num_processors

# finally add the information to self.data
self.data = (DataType, section)
Expand Down Expand Up @@ -340,7 +345,8 @@ def __format_expected_flux_section(self):
# add output directory if necesssary
if "out dir" in accepted_options:
section["out dir"] = self.out_dir

if "num processors" in accepted_options:
section["num processors"] = self.num_processors
# finally add the information to self.continua
self.expected_flux = (ExpectedFluxType, section)

Expand Down Expand Up @@ -369,11 +375,11 @@ def __format_general_section(self):
self.out_dir += "/"

self.overwrite = section.getboolean("overwrite")
if self.out_dir is None:
if self.overwrite is None:
raise ConfigError("Missing variable 'overwrite' in section [general]")

self.log = section.get("log")
if self.out_dir is None:
if self.log is None:
raise ConfigError("Missing variable 'log' in section [general]")
elif not (self.log.startswith(".") or self.log.startswith("/")):
self.log = self.out_dir + "Log/" + self.log
Expand All @@ -388,6 +394,11 @@ def __format_general_section(self):
if self.logging_level_file is None:
raise ConfigError("In section 'logging level file' in section [general]")
self.logging_level_file = self.logging_level_file.upper()

self.num_processors = section.get("num processors")
if self.num_processors is None:
raise ConfigError("Missing variable 'num processors' in section [general]")


def __format_masks_section(self):
"""Format the masks section of the parser into usable data
Expand Down
79 changes: 56 additions & 23 deletions py/picca/delta_extraction/data_catalogues/desi_healpix.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
"""
import logging
import os
import multiprocessing


import fitsio
import healpy
Expand All @@ -15,7 +17,7 @@
from picca.delta_extraction.utils_pk1d import spectral_resolution_desi, exp_diff_desi

accepted_options = sorted(
list(set(accepted_options + ["use non-coadded spectra"])))
list(set(accepted_options + ["use non-coadded spectra","num processors"])))

defaults.update({
"use non-coadded spectra": False,
Expand Down Expand Up @@ -96,6 +98,12 @@ def __parse_config(self, config):
raise DataError(
"Missing argument 'use non-coadded spectra' required by DesiHealpix"
)
self.num_processors = config.getint("num processors")
if self.num_processors is None:
raise DataError(
"Missing argument 'num processors' required by DesiHealpix")
if self.num_processors == 0:
self.num_processors = (multiprocessing.cpu_count() // 2)

def read_data(self):
"""Read the data.
Expand Down Expand Up @@ -127,29 +135,54 @@ def read_data(self):
self.catalogue.sort("HEALPIX")
grouped_catalogue = self.catalogue.group_by(["HEALPIX", "SURVEY"])

forests_by_targetid = {}
is_sv = True
for (index,
(healpix,
survey)), group in zip(enumerate(grouped_catalogue.groups.keys),
grouped_catalogue.groups):

if survey not in ["sv", "sv1", "sv2", "sv3"]:
is_sv = False

input_directory = f'{self.input_directory}/{survey}/dark'
coadd_name = "spectra" if self.use_non_coadded_spectra else "coadd"
filename = (
f"{input_directory}/{healpix//100}/{healpix}/{coadd_name}-{survey}-"
f"dark-{healpix}.fits")

#TODO: not sure if we want the dark survey to be hard coded in here, probably won't run on anything else, but still

self.logger.progress(
f"Read {index} of {len(grouped_catalogue.groups.keys)}. "
f"num_data: {len(forests_by_targetid)}")

self.read_file(filename, group, forests_by_targetid)
if self.num_processors>1:
context = multiprocessing.get_context('fork')
manager = multiprocessing.Manager()
forests_by_targetid = manager.dict()
arguments = []
for (index,
(healpix, survey)), group in zip(enumerate(grouped_catalogue.groups.keys),
grouped_catalogue.groups):

if survey not in ["sv", "sv1", "sv2", "sv3"]:
is_sv = False

input_directory = f'{self.input_directory}/{survey}/dark'
coadd_name = "spectra" if self.use_non_coadded_spectra else "coadd"
filename = (
f"{input_directory}/{healpix//100}/{healpix}/{coadd_name}-{survey}-"
f"dark-{healpix}.fits")

arguments.append((filename,group,forests_by_targetid))

self.logger.info(f"reading data from {len(arguments)} files")
with context.Pool(processes=self.num_processors) as pool:

pool.starmap(self.read_file, arguments)
else:
forests_by_targetid = {}
for (index,
(healpix,
survey)), group in zip(enumerate(grouped_catalogue.groups.keys),
grouped_catalogue.groups):

if survey not in ["sv", "sv1", "sv2", "sv3"]:
is_sv = False

input_directory = f'{self.input_directory}/{survey}/dark'
coadd_name = "spectra" if self.use_non_coadded_spectra else "coadd"
filename = (
f"{input_directory}/{healpix//100}/{healpix}/{coadd_name}-{survey}-"
f"dark-{healpix}.fits")

#TODO: not sure if we want the dark survey to be hard coded in here, probably won't run on anything else, but still

self.logger.progress(
f"Read {index} of {len(grouped_catalogue.groups.keys)}. "
f"num_data: {len(forests_by_targetid)}")

self.read_file(filename, group, forests_by_targetid)

if len(forests_by_targetid) == 0:
raise DataError("No Quasars found, stopping here")
Expand Down
55 changes: 36 additions & 19 deletions py/picca/delta_extraction/data_catalogues/desisim_mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ def __init__(self, config):

self.logger = logging.getLogger(__name__)
super().__init__(config)
if self.use_non_coadded_spectra:
self.logger.warning('the "use_non_coadded_spectra" option was set, '
'but has no effect on Mocks, will proceed as normal')

def read_data(self):
"""Read the spectra and formats its data as Forest instances.
Expand Down Expand Up @@ -93,26 +97,39 @@ def read_data(self):

grouped_catalogue = self.catalogue.group_by(["HEALPIX", "SURVEY"])
arguments=[]
if self.num_processors>1:
context = multiprocessing.get_context('fork')
manager = multiprocessing.Manager()
forests_by_targetid = manager.dict()

for (index,
(healpix, survey)), group in zip(enumerate(grouped_catalogue.groups.keys),
grouped_catalogue.groups):

filename = (
f"{self.input_directory}/{healpix//100}/{healpix}/spectra-"
f"{in_nside}-{healpix}.fits")
arguments.append((filename,group,forests_by_targetid))

self.logger.info(f"reading data from {len(arguments)} files")
with context.Pool(processes=self.num_processors) as pool:

pool.starmap(self.read_file, arguments)
else:
forests_by_targetid = {}
for (index,
(healpix, survey)), group in zip(enumerate(grouped_catalogue.groups.keys),
grouped_catalogue.groups):

filename = (
f"{self.input_directory}/{healpix//100}/{healpix}/spectra-"
f"{in_nside}-{healpix}.fits")
self.logger.progress(
f"Read {index} of {len(grouped_catalogue.groups.keys)}. "
f"num_data: {len(forests_by_targetid)}"
)
self.read_file(filename, group, forests_by_targetid)

self.num_processors = multiprocessing.cpu_count() // 2
context = multiprocessing.get_context('fork')
pool = context.Pool(processes=self.num_processors)
manager = multiprocessing.Manager()
forests_by_targetid = manager.dict()

for (index,
(healpix, survey)), group in zip(enumerate(grouped_catalogue.groups.keys),
grouped_catalogue.groups):

filename = (
f"{self.input_directory}/{healpix//100}/{healpix}/spectra-"
f"{in_nside}-{healpix}.fits")
arguments.append((filename,group,forests_by_targetid))

self.logger.info(f"reading data from {len(arguments)} files")
pool.starmap(self.read_file,arguments)

pool.close()
if len(forests_by_targetid) == 0:
raise DataError("No Quasars found, stopping here")
self.forests = list(forests_by_targetid.values())
Expand Down
13 changes: 13 additions & 0 deletions py/picca/delta_extraction/expected_flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
classes computing the mean expected flux must inherit. The mean expected flux
is the product of the unabsorbed quasar continuum and the mean transmission
"""
import multiprocessing
from picca.delta_extraction.astronomical_objects.pk1d_forest import Pk1dForest
from picca.delta_extraction.errors import ExpectedFluxError

Expand Down Expand Up @@ -29,19 +30,31 @@ class ExpectedFlux:
Arrays must have the same size as the flux array for the corresponding line
of sight forest instance.
num_processors: int
Number of processors to use for multiprocessing-enabled tasks (will be passed
downstream to e.g. ExpectedFlux and Data classes)
out_dir: str (from ExpectedFlux)
Directory where logs will be saved.
"""
def __init__(self, config):
"""Initialize class instance"""
self.los_ids = {}
self.num_processors = None

self.out_dir = config.get("out dir")
if self.out_dir is None:
raise ExpectedFluxError(
"Missing argument 'out dir' required by Dr16ExpectedFlux")
else:
self.out_dir += "Log/"

self.num_processors = config.getint("num processors")
if self.num_processors is None:
raise ExpectedFluxError(
"Missing argument 'num processors' required by Dr16ExpectedFlux")
if self.num_processors == 0:
self.num_processors = (multiprocessing.cpu_count() // 2)


# pylint: disable=no-self-use
Expand Down
16 changes: 2 additions & 14 deletions py/picca/delta_extraction/expected_fluxes/dr16_expected_flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
"limit var lss": (0., 0.3),
"num bins variance": 20,
"num iterations": 5,
"num processors": 1,
"order": 1,
"use constant weight": False,
"use ivar as weight": False,
Expand Down Expand Up @@ -125,10 +124,6 @@ class Dr16ExpectedFlux(ExpectedFlux):
num_iterations: int
Number of iterations to determine the mean continuum shape, LSS variances, etc.
num_processors: int
Number of processors to be used to compute the mean continua. None for no
specified number (subprocess will take its default value).
order: int
Order of the polynomial for the continuum fit.
Expand Down Expand Up @@ -162,7 +157,6 @@ def __init__(self, config):
self.order = None
self.num_bins_variance = None
self.num_iterations = None
self.num_processors = None
self.use_constant_weight = None
self.use_ivar_as_weight = None
self.__parse_config(config)
Expand Down Expand Up @@ -409,11 +403,6 @@ def __parse_config(self, config):
raise ExpectedFluxError(
"Missing argument 'num iterations' required by Dr16ExpectedFlux")

self.num_processors = config.getint("num processors")
if self.num_processors is None:
raise ExpectedFluxError(
"Missing argument 'num processors' required by Dr16ExpectedFlux")

self.order = config.getint("order")
if self.order is None:
raise ExpectedFluxError(
Expand Down Expand Up @@ -743,9 +732,8 @@ def compute_expected_flux(self, forests):
f"Continuum fitting: starting iteration {iteration} of {self.num_iterations}"
)
if self.num_processors > 1:
pool = context.Pool(processes=self.num_processors)
forests = pool.map(self.compute_continuum, forests)
pool.close()
with context.Pool(processes=self.num_processors) as pool:
forests = pool.map(self.compute_continuum, forests)
else:
forests = [self.compute_continuum(f) for f in forests]

Expand Down
7 changes: 0 additions & 7 deletions py/picca/delta_extraction/expected_fluxes/true_continuum.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,6 @@ class TrueContinuum(ExpectedFlux):
of deltas at a given iteration step. Intermediate files will add
'_iteration{num}.fits.gz' to the prefix for intermediate steps and '.fits.gz'
for the final results.
num_processors: int or None
Number of processors to be used to compute the mean continua. None for no
specified number (subprocess will take its default value).
"""

def __init__(self, config):
Expand All @@ -91,7 +87,6 @@ def __init__(self, config):
# load variables from config
self.input_directory = None
self.iter_out_prefix = None
self.num_processors = None
self.__parse_config(config)


Expand Down Expand Up @@ -130,8 +125,6 @@ def __parse_config(self, config):
"'iter out prefix' should not incude folders. "
f"Found: {self.iter_out_prefix}")

self.num_processors = config.getint("num processors")

self.raw_statistics_filename = config.get("raw statistics file")


Expand Down

0 comments on commit 755db3a

Please sign in to comment.