Skip to content

Commit

Permalink
Merge 6018172 into d434fec
Browse files Browse the repository at this point in the history
  • Loading branch information
p-slash committed Aug 18, 2022
2 parents d434fec + 6018172 commit 6bde819
Show file tree
Hide file tree
Showing 10 changed files with 6,755 additions and 50 deletions.
Binary file modified docs/delta_extraction/data_model/data_model-ExpectedFlux.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/delta_extraction/data_model/data_model-FullDataModel.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/delta_extraction/data_model/data_model-Main.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6,641 changes: 6,640 additions & 1 deletion docs/delta_extraction/data_model/data_model.drawio

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions py/picca/delta_extraction/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,11 @@ def __format_data_section(self):
if key not in section:
section[key] = str(value)

# add output directory and num processors
section["out dir"] = self.out_dir
if "num processors" in accepted_options and "num processors" not in section:
section["num processors"] = str(self.num_processors)

# finally add the information to self.data
self.data = (DataType, section)

Expand Down
94 changes: 71 additions & 23 deletions py/picca/delta_extraction/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
classes loading data must inherit
"""
import logging
import multiprocessing

import numpy as np
import fitsio
Expand Down Expand Up @@ -32,6 +33,7 @@
"minimum number pixels in forest",
"out dir",
"rejection log file",
"num processors",
]

defaults = {
Expand All @@ -49,6 +51,47 @@

accepted_analysis_type = ["BAO 3D", "PK 1D"]

def _save_deltas_one_healpix(out_dir, healpix, forests):
"""Saves the deltas that belong to one healpix.
Arguments
---------
out_dir: str
Parent directory to save deltas.
healpix: int
forests: List of Forests
List of forests to save into one file.
Returns:
---------
header_n_size: List of (header, size)
List of forest.header and forest.size to later
add to rejection log as accepted.
"""
results = fitsio.FITS(
f"{out_dir}/Delta/delta-{healpix}.fits.gz",
'rw',
clobber=True)

header_n_size = []
for forest in forests:
header = forest.get_header()
cols, names, units, comments = forest.get_data()
results.write(cols,
names=names,
header=header,
comment=comments,
units=units,
extname=str(forest.los_id))

# store information for logs
header_n_size.append((header, forest.flux.size))
# self.add_to_rejection_log(header, forest.flux.size, "accepted")
results.close()

return header_n_size

class Data:
"""Abstract class from which all classes loading data must inherit.
Expand Down Expand Up @@ -117,6 +160,7 @@ def __init__(self, config):
self.out_dir = None
self.rejection_log_file = None
self.min_snr = None
self.num_processors = None
self.__parse_config(config)

# rejection log arays
Expand Down Expand Up @@ -232,6 +276,13 @@ def __parse_config(self, config):
"Missing argument 'minimum number pixels in forest' "
"required by Data")

self.num_processors = config.getint("num processors")
if self.num_processors is None:
raise DataError(
"Missing argument 'num processors' required by Data")
if self.num_processors == 0:
self.num_processors = (multiprocessing.cpu_count() // 2)

self.out_dir = config.get("out dir")
if self.out_dir is None:
raise DataError("Missing argument 'out dir' required by Data")
Expand Down Expand Up @@ -404,30 +455,27 @@ def save_deltas(self):
"""Save the deltas."""
healpixs = np.array([forest.healpix for forest in self.forests])
unique_healpixs = np.unique(healpixs)
healpixs_indexs = {
healpix: np.where(healpixs == healpix)[0]
for healpix in unique_healpixs
}

for healpix, indexs in sorted(healpixs_indexs.items()):
results = fitsio.FITS(
f"{self.out_dir}/Delta/delta-{healpix}.fits.gz",
'rw',
clobber=True)
for index in indexs:
forest = self.forests[index]
header = forest.get_header()
cols, names, units, comments = forest.get_data()
results.write(cols,
names=names,
header=header,
comment=comments,
units=units,
extname=str(forest.los_id))

# store information for logs
self.add_to_rejection_log(header, forest.flux.size, "accepted")
results.close()
arguments = []
for healpix in unique_healpixs:
this_idx = np.nonzero(healpix == healpixs)[0]
grouped_forests = [self.forests[i] for i in this_idx]
arguments.append((self.out_dir, healpix, grouped_forests))

if self.num_processors > 1:
context = multiprocessing.get_context('fork')
with context.Pool(processes=self.num_processors) as pool:
header_n_sizes = pool.starmap(_save_deltas_one_healpix,
arguments)
else:
header_n_sizes = []
for args in arguments:
header_n_sizes.append(_save_deltas_one_healpix(*args))

# store information for logs
for header_n_size in header_n_sizes:
for header, size in header_n_size:
self.add_to_rejection_log(header, size, "accepted")

self.save_rejection_log()

Expand Down
19 changes: 6 additions & 13 deletions py/picca/delta_extraction/data_catalogues/desi_data.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""This module defines the class DesiData to load DESI data
"""
import logging
import multiprocessing
import time
import numpy as np

Expand All @@ -19,10 +18,12 @@
from picca.delta_extraction.utils_pk1d import spectral_resolution_desi, exp_diff_desi
from picca.delta_extraction.utils import update_accepted_options, update_default_options

accepted_options = update_accepted_options(accepted_options, accepted_options_quasar_catalogue)
accepted_options = update_accepted_options(
accepted_options,
["blinding", "num processors", "use non-coadded spectra", "wave solution"])
accepted_options = sorted(
list(
set(accepted_options + accepted_options_quasar_catalogue + [
"blinding", "use non-coadded spectra",
"wave solution"
])))

defaults = update_default_options(defaults, {
"delta lambda": 0.8,
Expand Down Expand Up @@ -105,7 +106,6 @@ def __init__(self, config):

# load variables from config
self.blinding = None
self.num_processors = None
self.use_non_coadded_spectra = None
self.__parse_config(config)

Expand Down Expand Up @@ -148,13 +148,6 @@ def __parse_config(self, config):
f"are {ACCEPTED_BLINDING_STRATEGIES}. "
f"Found '{self.blinding}'")

self.num_processors = config.getint("num processors")
if self.num_processors is None:
raise DataError(
"Missing argument 'num processors' required by DesiData")
if self.num_processors == 0:
self.num_processors = (multiprocessing.cpu_count() // 2)

self.use_non_coadded_spectra = config.getboolean(
"use non-coadded spectra")
if self.use_non_coadded_spectra is None:
Expand Down
1 change: 1 addition & 0 deletions py/picca/tests/delta_extraction/data/.config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ best obs = False
keep bal = False
minimal snr pk1d = 1
minimal snr bao3d = 0
num processors = 1

[corrections]
num corrections = 4
Expand Down
44 changes: 31 additions & 13 deletions py/picca/tests/delta_extraction/data_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def test_data(self):
"wave solution": "log",
"delta log lambda": 3e-4,
"input directory": f"{THIS_DIR}/data/",
"num processors": 1,
}})
for key, value in defaults_data.items():
if key not in config["data"]:
Expand All @@ -116,6 +117,7 @@ def test_data(self):
"wave solution": "log",
"delta log lambda": 3e-4,
"input directory": f"{THIS_DIR}/data/",
"num processors": 1,
}})
for key, value in defaults_data.items():
if key not in config["data"]:
Expand Down Expand Up @@ -350,6 +352,26 @@ def test_data_parse_config(self):
Data(config["data"])
self.compare_error_message(context_manager, expected_message)

# create a Data instance with missing num_processors
config = ConfigParser()
config.read_dict({"data": {
"wave solution": "lin",
"delta lambda": 0.8,
"lambda max": 5500.0,
"lambda max rest frame": 1200.0,
"lambda min": 3600.0,
"lambda min rest frame": 1040.0,
"analysis type": "BAO 3D",
"input directory": f"{THIS_DIR}/data",
"minimum number pixels in forest": 50,
}})
expected_message = (
"Missing argument 'num processors' required by Data"
)
with self.assertRaises(DataError) as context_manager:
Data(config["data"])
self.compare_error_message(context_manager, expected_message)

# create a Data instance with missing out_dir
config = ConfigParser()
config.read_dict({"data": {
Expand All @@ -362,6 +384,7 @@ def test_data_parse_config(self):
"analysis type": "BAO 3D",
"input directory": f"{THIS_DIR}/data",
"minimum number pixels in forest": 50,
"num processors": 1,
}})
expected_message = (
"Missing argument 'out dir' required by Data"
Expand All @@ -382,6 +405,7 @@ def test_data_parse_config(self):
"analysis type": "BAO 3D",
"input directory": f"{THIS_DIR}/data",
"minimum number pixels in forest": 50,
"num processors": 1,
"out dir": f"{THIS_DIR}/results",
}})
expected_message = (
Expand All @@ -403,6 +427,7 @@ def test_data_parse_config(self):
"analysis type": "BAO 3D",
"input directory": f"{THIS_DIR}/data",
"minimum number pixels in forest": 50,
"num processors": 1,
"out dir": f"{THIS_DIR}/results",
"rejection log file": "results/rejection_log.fits.gz",
}})
Expand All @@ -426,6 +451,7 @@ def test_data_parse_config(self):
"analysis type": "BAO 3D",
"input directory": f"{THIS_DIR}/data",
"minimum number pixels in forest": 50,
"num processors": 1,
"out dir": f"{THIS_DIR}/results",
"rejection log file": "rejection_log.txt",
}})
Expand All @@ -451,6 +477,7 @@ def test_data_parse_config(self):
"analysis type": "BAO 3D",
"input directory": f"{THIS_DIR}/data",
"minimum number pixels in forest": 50,
"num processors": 1,
"out dir": f"{THIS_DIR}/results",
"rejection log file": "rejection_log.fits.gz",
}})
Expand All @@ -476,6 +503,7 @@ def test_data_parse_config(self):
"lambda abs IGM": "LYA",
"input directory": f"{THIS_DIR}/data",
"minimum number pixels in forest": 50,
"num processors": 1,
"out dir": f"{THIS_DIR}/results",
"rejection log file": "rejection_log.fits.gz",
}})
Expand All @@ -499,6 +527,7 @@ def test_data_filter_forests(self):
"delta log lambda": 3e-4,
"delta log lambda rest frame": 3e-4,
"input directory": f"{THIS_DIR}/data/",
"num processors": 1,
}})
for key, value in defaults_data.items():
if key not in config["data"]:
Expand Down Expand Up @@ -538,6 +567,7 @@ def test_data_filter_forests(self):
"delta log lambda": 3e-4,
"delta log lambda rest frame": 3e-4,
"input directory": f"{THIS_DIR}/data/",
"num processors": 1,
}})
for key, value in defaults_data.items():
if key not in config["data"]:
Expand All @@ -564,6 +594,7 @@ def test_data_filter_forests(self):
"delta log lambda rest frame": 3e-4,
"input directory": f"{THIS_DIR}/data/",
"minimal snr bao3d": 100000000,
"num processors": 1,
}})
for key, value in defaults_data.items():
if key not in config["data"]:
Expand Down Expand Up @@ -635,23 +666,10 @@ def test_desi_data_parse_config(self):
data._DesiData__parse_config(config["data"])
self.compare_error_message(context_manager, expected_message)

# run __parse_config with missing num_processors
config = ConfigParser()
config.read_dict({"data": {
"blinding": "none",
}})
expected_message = (
"Missing argument 'num processors' required by DesiData"
)
with self.assertRaises(DataError) as context_manager:
data._DesiData__parse_config(config["data"])
self.compare_error_message(context_manager, expected_message)

# run __parse_config with missing 'use_non_coadded_spectra'
config = ConfigParser()
config.read_dict({"data": {
"blinding": "none",
"num processors": 1,
}})
expected_message = (
"Missing argument 'use non-coadded spectra' required by DesiData"
Expand Down
1 change: 1 addition & 0 deletions py/picca/tests/delta_extraction/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ def setup_pk1d_forest(absorber):
"lambda max": 7235.0,
"lambda min rest frame": 2900.0,
"lambda max rest frame": 3120.0,
"num processors": 1,
}

desi_mock_data_kwargs = {
Expand Down

0 comments on commit 6bde819

Please sign in to comment.